tipc: convert tipc reference table to use generic rhashtable
[firefly-linux-kernel-4.4.55.git] / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2014, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include <linux/rhashtable.h>
38 #include <linux/jhash.h>
39 #include "core.h"
40 #include "name_table.h"
41 #include "node.h"
42 #include "link.h"
43 #include "config.h"
44 #include "socket.h"
45
46 #define SS_LISTENING            -1      /* socket is listening */
47 #define SS_READY                -2      /* socket is connectionless */
48
49 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
50 #define CONN_PROBING_INTERVAL   3600000 /* [ms] => 1 h */
51 #define TIPC_FWD_MSG            1
52 #define TIPC_CONN_OK            0
53 #define TIPC_CONN_PROBING       1
54 #define TIPC_MAX_PORT           0xffffffff
55 #define TIPC_MIN_PORT           1
56
57 /**
58  * struct tipc_sock - TIPC socket structure
59  * @sk: socket - interacts with 'port' and with user via the socket API
60  * @connected: non-zero if port is currently connected to a peer port
61  * @conn_type: TIPC type used when connection was established
62  * @conn_instance: TIPC instance used when connection was established
63  * @published: non-zero if port has one or more associated names
64  * @max_pkt: maximum packet size "hint" used when building messages sent by port
65  * @portid: unique port identity in TIPC socket hash table
66  * @phdr: preformatted message header used when sending messages
67  * @port_list: adjacent ports in TIPC's global list of ports
68  * @publications: list of publications for port
69  * @pub_count: total # of publications port has made during its lifetime
70  * @probing_state:
71  * @probing_interval:
72  * @timer:
73  * @port: port - interacts with 'sk' and with the rest of the TIPC stack
74  * @peer_name: the peer of the connection, if any
75  * @conn_timeout: the time we can wait for an unresponded setup request
76  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
77  * @link_cong: non-zero if owner must sleep because of link congestion
78  * @sent_unacked: # messages sent by socket, and not yet acked by peer
79  * @rcv_unacked: # messages read by user, but not yet acked back to peer
80  * @node: hash table node
81  * @rcu: rcu struct for tipc_sock
82  */
83 struct tipc_sock {
84         struct sock sk;
85         int connected;
86         u32 conn_type;
87         u32 conn_instance;
88         int published;
89         u32 max_pkt;
90         u32 portid;
91         struct tipc_msg phdr;
92         struct list_head sock_list;
93         struct list_head publications;
94         u32 pub_count;
95         u32 probing_state;
96         u32 probing_interval;
97         struct timer_list timer;
98         uint conn_timeout;
99         atomic_t dupl_rcvcnt;
100         bool link_cong;
101         uint sent_unacked;
102         uint rcv_unacked;
103         struct rhash_head node;
104         struct rcu_head rcu;
105 };
106
107 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
108 static void tipc_data_ready(struct sock *sk);
109 static void tipc_write_space(struct sock *sk);
110 static int tipc_release(struct socket *sock);
111 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
112 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
113 static void tipc_sk_timeout(unsigned long portid);
114 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
115                            struct tipc_name_seq const *seq);
116 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
117                             struct tipc_name_seq const *seq);
118 static struct tipc_sock *tipc_sk_lookup(u32 portid);
119 static int tipc_sk_insert(struct tipc_sock *tsk);
120 static void tipc_sk_remove(struct tipc_sock *tsk);
121
122 static const struct proto_ops packet_ops;
123 static const struct proto_ops stream_ops;
124 static const struct proto_ops msg_ops;
125
126 static struct proto tipc_proto;
127 static struct proto tipc_proto_kern;
128
129 static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
130         [TIPC_NLA_SOCK_UNSPEC]          = { .type = NLA_UNSPEC },
131         [TIPC_NLA_SOCK_ADDR]            = { .type = NLA_U32 },
132         [TIPC_NLA_SOCK_REF]             = { .type = NLA_U32 },
133         [TIPC_NLA_SOCK_CON]             = { .type = NLA_NESTED },
134         [TIPC_NLA_SOCK_HAS_PUBL]        = { .type = NLA_FLAG }
135 };
136
137 /*
138  * Revised TIPC socket locking policy:
139  *
140  * Most socket operations take the standard socket lock when they start
141  * and hold it until they finish (or until they need to sleep).  Acquiring
142  * this lock grants the owner exclusive access to the fields of the socket
143  * data structures, with the exception of the backlog queue.  A few socket
144  * operations can be done without taking the socket lock because they only
145  * read socket information that never changes during the life of the socket.
146  *
147  * Socket operations may acquire the lock for the associated TIPC port if they
148  * need to perform an operation on the port.  If any routine needs to acquire
149  * both the socket lock and the port lock it must take the socket lock first
150  * to avoid the risk of deadlock.
151  *
152  * The dispatcher handling incoming messages cannot grab the socket lock in
153  * the standard fashion, since invoked it runs at the BH level and cannot block.
154  * Instead, it checks to see if the socket lock is currently owned by someone,
155  * and either handles the message itself or adds it to the socket's backlog
156  * queue; in the latter case the queued message is processed once the process
157  * owning the socket lock releases it.
158  *
159  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
160  * the problem of a blocked socket operation preventing any other operations
161  * from occurring.  However, applications must be careful if they have
162  * multiple threads trying to send (or receive) on the same socket, as these
163  * operations might interfere with each other.  For example, doing a connect
164  * and a receive at the same time might allow the receive to consume the
165  * ACK message meant for the connect.  While additional work could be done
166  * to try and overcome this, it doesn't seem to be worthwhile at the present.
167  *
168  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
169  * that another operation that must be performed in a non-blocking manner is
170  * not delayed for very long because the lock has already been taken.
171  *
172  * NOTE: This code assumes that certain fields of a port/socket pair are
173  * constant over its lifetime; such fields can be examined without taking
174  * the socket lock and/or port lock, and do not need to be re-read even
175  * after resuming processing after waiting.  These fields include:
176  *   - socket type
177  *   - pointer to socket sk structure (aka tipc_sock structure)
178  *   - pointer to port structure
179  *   - port reference
180  */
181
182 /* Protects tipc socket hash table mutations */
183 static struct rhashtable tipc_sk_rht;
184
185 static u32 tsk_peer_node(struct tipc_sock *tsk)
186 {
187         return msg_destnode(&tsk->phdr);
188 }
189
190 static u32 tsk_peer_port(struct tipc_sock *tsk)
191 {
192         return msg_destport(&tsk->phdr);
193 }
194
195 static  bool tsk_unreliable(struct tipc_sock *tsk)
196 {
197         return msg_src_droppable(&tsk->phdr) != 0;
198 }
199
200 static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
201 {
202         msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
203 }
204
205 static bool tsk_unreturnable(struct tipc_sock *tsk)
206 {
207         return msg_dest_droppable(&tsk->phdr) != 0;
208 }
209
210 static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
211 {
212         msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
213 }
214
215 static int tsk_importance(struct tipc_sock *tsk)
216 {
217         return msg_importance(&tsk->phdr);
218 }
219
220 static int tsk_set_importance(struct tipc_sock *tsk, int imp)
221 {
222         if (imp > TIPC_CRITICAL_IMPORTANCE)
223                 return -EINVAL;
224         msg_set_importance(&tsk->phdr, (u32)imp);
225         return 0;
226 }
227
228 static struct tipc_sock *tipc_sk(const struct sock *sk)
229 {
230         return container_of(sk, struct tipc_sock, sk);
231 }
232
233 static int tsk_conn_cong(struct tipc_sock *tsk)
234 {
235         return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
236 }
237
238 /**
239  * tsk_advance_rx_queue - discard first buffer in socket receive queue
240  *
241  * Caller must hold socket lock
242  */
243 static void tsk_advance_rx_queue(struct sock *sk)
244 {
245         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
246 }
247
248 /**
249  * tsk_rej_rx_queue - reject all buffers in socket receive queue
250  *
251  * Caller must hold socket lock
252  */
253 static void tsk_rej_rx_queue(struct sock *sk)
254 {
255         struct sk_buff *skb;
256         u32 dnode;
257
258         while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
259                 if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
260                         tipc_link_xmit_skb(skb, dnode, 0);
261         }
262 }
263
264 /* tsk_peer_msg - verify if message was sent by connected port's peer
265  *
266  * Handles cases where the node's network address has changed from
267  * the default of <0.0.0> to its configured setting.
268  */
269 static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
270 {
271         u32 peer_port = tsk_peer_port(tsk);
272         u32 orig_node;
273         u32 peer_node;
274
275         if (unlikely(!tsk->connected))
276                 return false;
277
278         if (unlikely(msg_origport(msg) != peer_port))
279                 return false;
280
281         orig_node = msg_orignode(msg);
282         peer_node = tsk_peer_node(tsk);
283
284         if (likely(orig_node == peer_node))
285                 return true;
286
287         if (!orig_node && (peer_node == tipc_own_addr))
288                 return true;
289
290         if (!peer_node && (orig_node == tipc_own_addr))
291                 return true;
292
293         return false;
294 }
295
296 /**
297  * tipc_sk_create - create a TIPC socket
298  * @net: network namespace (must be default network)
299  * @sock: pre-allocated socket structure
300  * @protocol: protocol indicator (must be 0)
301  * @kern: caused by kernel or by userspace?
302  *
303  * This routine creates additional data structures used by the TIPC socket,
304  * initializes them, and links them together.
305  *
306  * Returns 0 on success, errno otherwise
307  */
308 static int tipc_sk_create(struct net *net, struct socket *sock,
309                           int protocol, int kern)
310 {
311         const struct proto_ops *ops;
312         socket_state state;
313         struct sock *sk;
314         struct tipc_sock *tsk;
315         struct tipc_msg *msg;
316
317         /* Validate arguments */
318         if (unlikely(protocol != 0))
319                 return -EPROTONOSUPPORT;
320
321         switch (sock->type) {
322         case SOCK_STREAM:
323                 ops = &stream_ops;
324                 state = SS_UNCONNECTED;
325                 break;
326         case SOCK_SEQPACKET:
327                 ops = &packet_ops;
328                 state = SS_UNCONNECTED;
329                 break;
330         case SOCK_DGRAM:
331         case SOCK_RDM:
332                 ops = &msg_ops;
333                 state = SS_READY;
334                 break;
335         default:
336                 return -EPROTOTYPE;
337         }
338
339         /* Allocate socket's protocol area */
340         if (!kern)
341                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
342         else
343                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
344
345         if (sk == NULL)
346                 return -ENOMEM;
347
348         tsk = tipc_sk(sk);
349         tsk->max_pkt = MAX_PKT_DEFAULT;
350         INIT_LIST_HEAD(&tsk->publications);
351         msg = &tsk->phdr;
352         tipc_msg_init(msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
353                       NAMED_H_SIZE, 0);
354
355         /* Finish initializing socket data structures */
356         sock->ops = ops;
357         sock->state = state;
358         sock_init_data(sock, sk);
359         if (tipc_sk_insert(tsk)) {
360                 pr_warn("Socket create failed; port numbrer exhausted\n");
361                 return -EINVAL;
362         }
363         msg_set_origport(msg, tsk->portid);
364         k_init_timer(&tsk->timer, (Handler)tipc_sk_timeout, tsk->portid);
365         sk->sk_backlog_rcv = tipc_backlog_rcv;
366         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
367         sk->sk_data_ready = tipc_data_ready;
368         sk->sk_write_space = tipc_write_space;
369         tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
370         tsk->sent_unacked = 0;
371         atomic_set(&tsk->dupl_rcvcnt, 0);
372
373         if (sock->state == SS_READY) {
374                 tsk_set_unreturnable(tsk, true);
375                 if (sock->type == SOCK_DGRAM)
376                         tsk_set_unreliable(tsk, true);
377         }
378         return 0;
379 }
380
381 /**
382  * tipc_sock_create_local - create TIPC socket from inside TIPC module
383  * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
384  *
385  * We cannot use sock_creat_kern here because it bumps module user count.
386  * Since socket owner and creator is the same module we must make sure
387  * that module count remains zero for module local sockets, otherwise
388  * we cannot do rmmod.
389  *
390  * Returns 0 on success, errno otherwise
391  */
392 int tipc_sock_create_local(int type, struct socket **res)
393 {
394         int rc;
395
396         rc = sock_create_lite(AF_TIPC, type, 0, res);
397         if (rc < 0) {
398                 pr_err("Failed to create kernel socket\n");
399                 return rc;
400         }
401         tipc_sk_create(&init_net, *res, 0, 1);
402
403         return 0;
404 }
405
406 /**
407  * tipc_sock_release_local - release socket created by tipc_sock_create_local
408  * @sock: the socket to be released.
409  *
410  * Module reference count is not incremented when such sockets are created,
411  * so we must keep it from being decremented when they are released.
412  */
413 void tipc_sock_release_local(struct socket *sock)
414 {
415         tipc_release(sock);
416         sock->ops = NULL;
417         sock_release(sock);
418 }
419
420 /**
421  * tipc_sock_accept_local - accept a connection on a socket created
422  * with tipc_sock_create_local. Use this function to avoid that
423  * module reference count is inadvertently incremented.
424  *
425  * @sock:    the accepting socket
426  * @newsock: reference to the new socket to be created
427  * @flags:   socket flags
428  */
429
430 int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
431                            int flags)
432 {
433         struct sock *sk = sock->sk;
434         int ret;
435
436         ret = sock_create_lite(sk->sk_family, sk->sk_type,
437                                sk->sk_protocol, newsock);
438         if (ret < 0)
439                 return ret;
440
441         ret = tipc_accept(sock, *newsock, flags);
442         if (ret < 0) {
443                 sock_release(*newsock);
444                 return ret;
445         }
446         (*newsock)->ops = sock->ops;
447         return ret;
448 }
449
450 static void tipc_sk_callback(struct rcu_head *head)
451 {
452         struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
453
454         sock_put(&tsk->sk);
455 }
456
457 /**
458  * tipc_release - destroy a TIPC socket
459  * @sock: socket to destroy
460  *
461  * This routine cleans up any messages that are still queued on the socket.
462  * For DGRAM and RDM socket types, all queued messages are rejected.
463  * For SEQPACKET and STREAM socket types, the first message is rejected
464  * and any others are discarded.  (If the first message on a STREAM socket
465  * is partially-read, it is discarded and the next one is rejected instead.)
466  *
467  * NOTE: Rejected messages are not necessarily returned to the sender!  They
468  * are returned or discarded according to the "destination droppable" setting
469  * specified for the message by the sender.
470  *
471  * Returns 0 on success, errno otherwise
472  */
473 static int tipc_release(struct socket *sock)
474 {
475         struct sock *sk = sock->sk;
476         struct tipc_sock *tsk;
477         struct sk_buff *skb;
478         u32 dnode;
479
480         /*
481          * Exit if socket isn't fully initialized (occurs when a failed accept()
482          * releases a pre-allocated child socket that was never used)
483          */
484         if (sk == NULL)
485                 return 0;
486
487         tsk = tipc_sk(sk);
488         lock_sock(sk);
489
490         /*
491          * Reject all unreceived messages, except on an active connection
492          * (which disconnects locally & sends a 'FIN+' to peer)
493          */
494         dnode = tsk_peer_node(tsk);
495         while (sock->state != SS_DISCONNECTING) {
496                 skb = __skb_dequeue(&sk->sk_receive_queue);
497                 if (skb == NULL)
498                         break;
499                 if (TIPC_SKB_CB(skb)->handle != NULL)
500                         kfree_skb(skb);
501                 else {
502                         if ((sock->state == SS_CONNECTING) ||
503                             (sock->state == SS_CONNECTED)) {
504                                 sock->state = SS_DISCONNECTING;
505                                 tsk->connected = 0;
506                                 tipc_node_remove_conn(dnode, tsk->portid);
507                         }
508                         if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
509                                 tipc_link_xmit_skb(skb, dnode, 0);
510                 }
511         }
512
513         tipc_sk_withdraw(tsk, 0, NULL);
514         k_cancel_timer(&tsk->timer);
515         tipc_sk_remove(tsk);
516         if (tsk->connected) {
517                 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
518                                       SHORT_H_SIZE, 0, dnode, tipc_own_addr,
519                                       tsk_peer_port(tsk),
520                                       tsk->portid, TIPC_ERR_NO_PORT);
521                 if (skb)
522                         tipc_link_xmit_skb(skb, dnode, tsk->portid);
523                 tipc_node_remove_conn(dnode, tsk->portid);
524         }
525         k_term_timer(&tsk->timer);
526
527         /* Discard any remaining (connection-based) messages in receive queue */
528         __skb_queue_purge(&sk->sk_receive_queue);
529
530         /* Reject any messages that accumulated in backlog queue */
531         sock->state = SS_DISCONNECTING;
532         release_sock(sk);
533
534         call_rcu(&tsk->rcu, tipc_sk_callback);
535         sock->sk = NULL;
536
537         return 0;
538 }
539
540 /**
541  * tipc_bind - associate or disassocate TIPC name(s) with a socket
542  * @sock: socket structure
543  * @uaddr: socket address describing name(s) and desired operation
544  * @uaddr_len: size of socket address data structure
545  *
546  * Name and name sequence binding is indicated using a positive scope value;
547  * a negative scope value unbinds the specified name.  Specifying no name
548  * (i.e. a socket address length of 0) unbinds all names from the socket.
549  *
550  * Returns 0 on success, errno otherwise
551  *
552  * NOTE: This routine doesn't need to take the socket lock since it doesn't
553  *       access any non-constant socket information.
554  */
555 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
556                      int uaddr_len)
557 {
558         struct sock *sk = sock->sk;
559         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
560         struct tipc_sock *tsk = tipc_sk(sk);
561         int res = -EINVAL;
562
563         lock_sock(sk);
564         if (unlikely(!uaddr_len)) {
565                 res = tipc_sk_withdraw(tsk, 0, NULL);
566                 goto exit;
567         }
568
569         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
570                 res = -EINVAL;
571                 goto exit;
572         }
573         if (addr->family != AF_TIPC) {
574                 res = -EAFNOSUPPORT;
575                 goto exit;
576         }
577
578         if (addr->addrtype == TIPC_ADDR_NAME)
579                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
580         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
581                 res = -EAFNOSUPPORT;
582                 goto exit;
583         }
584
585         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
586             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
587             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
588                 res = -EACCES;
589                 goto exit;
590         }
591
592         res = (addr->scope > 0) ?
593                 tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
594                 tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
595 exit:
596         release_sock(sk);
597         return res;
598 }
599
600 /**
601  * tipc_getname - get port ID of socket or peer socket
602  * @sock: socket structure
603  * @uaddr: area for returned socket address
604  * @uaddr_len: area for returned length of socket address
605  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
606  *
607  * Returns 0 on success, errno otherwise
608  *
609  * NOTE: This routine doesn't need to take the socket lock since it only
610  *       accesses socket information that is unchanging (or which changes in
611  *       a completely predictable manner).
612  */
613 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
614                         int *uaddr_len, int peer)
615 {
616         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
617         struct tipc_sock *tsk = tipc_sk(sock->sk);
618
619         memset(addr, 0, sizeof(*addr));
620         if (peer) {
621                 if ((sock->state != SS_CONNECTED) &&
622                         ((peer != 2) || (sock->state != SS_DISCONNECTING)))
623                         return -ENOTCONN;
624                 addr->addr.id.ref = tsk_peer_port(tsk);
625                 addr->addr.id.node = tsk_peer_node(tsk);
626         } else {
627                 addr->addr.id.ref = tsk->portid;
628                 addr->addr.id.node = tipc_own_addr;
629         }
630
631         *uaddr_len = sizeof(*addr);
632         addr->addrtype = TIPC_ADDR_ID;
633         addr->family = AF_TIPC;
634         addr->scope = 0;
635         addr->addr.name.domain = 0;
636
637         return 0;
638 }
639
640 /**
641  * tipc_poll - read and possibly block on pollmask
642  * @file: file structure associated with the socket
643  * @sock: socket for which to calculate the poll bits
644  * @wait: ???
645  *
646  * Returns pollmask value
647  *
648  * COMMENTARY:
649  * It appears that the usual socket locking mechanisms are not useful here
650  * since the pollmask info is potentially out-of-date the moment this routine
651  * exits.  TCP and other protocols seem to rely on higher level poll routines
652  * to handle any preventable race conditions, so TIPC will do the same ...
653  *
654  * TIPC sets the returned events as follows:
655  *
656  * socket state         flags set
657  * ------------         ---------
658  * unconnected          no read flags
659  *                      POLLOUT if port is not congested
660  *
661  * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
662  *                      no write flags
663  *
664  * connected            POLLIN/POLLRDNORM if data in rx queue
665  *                      POLLOUT if port is not congested
666  *
667  * disconnecting        POLLIN/POLLRDNORM/POLLHUP
668  *                      no write flags
669  *
670  * listening            POLLIN if SYN in rx queue
671  *                      no write flags
672  *
673  * ready                POLLIN/POLLRDNORM if data in rx queue
674  * [connectionless]     POLLOUT (since port cannot be congested)
675  *
676  * IMPORTANT: The fact that a read or write operation is indicated does NOT
677  * imply that the operation will succeed, merely that it should be performed
678  * and will not block.
679  */
680 static unsigned int tipc_poll(struct file *file, struct socket *sock,
681                               poll_table *wait)
682 {
683         struct sock *sk = sock->sk;
684         struct tipc_sock *tsk = tipc_sk(sk);
685         u32 mask = 0;
686
687         sock_poll_wait(file, sk_sleep(sk), wait);
688
689         switch ((int)sock->state) {
690         case SS_UNCONNECTED:
691                 if (!tsk->link_cong)
692                         mask |= POLLOUT;
693                 break;
694         case SS_READY:
695         case SS_CONNECTED:
696                 if (!tsk->link_cong && !tsk_conn_cong(tsk))
697                         mask |= POLLOUT;
698                 /* fall thru' */
699         case SS_CONNECTING:
700         case SS_LISTENING:
701                 if (!skb_queue_empty(&sk->sk_receive_queue))
702                         mask |= (POLLIN | POLLRDNORM);
703                 break;
704         case SS_DISCONNECTING:
705                 mask = (POLLIN | POLLRDNORM | POLLHUP);
706                 break;
707         }
708
709         return mask;
710 }
711
712 /**
713  * tipc_sendmcast - send multicast message
714  * @sock: socket structure
715  * @seq: destination address
716  * @msg: message to send
717  * @dsz: total length of message data
718  * @timeo: timeout to wait for wakeup
719  *
720  * Called from function tipc_sendmsg(), which has done all sanity checks
721  * Returns the number of bytes sent on success, or errno
722  */
723 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
724                           struct msghdr *msg, size_t dsz, long timeo)
725 {
726         struct sock *sk = sock->sk;
727         struct tipc_msg *mhdr = &tipc_sk(sk)->phdr;
728         struct sk_buff_head head;
729         uint mtu;
730         int rc;
731
732         msg_set_type(mhdr, TIPC_MCAST_MSG);
733         msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
734         msg_set_destport(mhdr, 0);
735         msg_set_destnode(mhdr, 0);
736         msg_set_nametype(mhdr, seq->type);
737         msg_set_namelower(mhdr, seq->lower);
738         msg_set_nameupper(mhdr, seq->upper);
739         msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
740
741 new_mtu:
742         mtu = tipc_bclink_get_mtu();
743         __skb_queue_head_init(&head);
744         rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &head);
745         if (unlikely(rc < 0))
746                 return rc;
747
748         do {
749                 rc = tipc_bclink_xmit(&head);
750                 if (likely(rc >= 0)) {
751                         rc = dsz;
752                         break;
753                 }
754                 if (rc == -EMSGSIZE)
755                         goto new_mtu;
756                 if (rc != -ELINKCONG)
757                         break;
758                 tipc_sk(sk)->link_cong = 1;
759                 rc = tipc_wait_for_sndmsg(sock, &timeo);
760                 if (rc)
761                         __skb_queue_purge(&head);
762         } while (!rc);
763         return rc;
764 }
765
766 /* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
767  */
768 void tipc_sk_mcast_rcv(struct sk_buff *buf)
769 {
770         struct tipc_msg *msg = buf_msg(buf);
771         struct tipc_port_list dports = {0, NULL, };
772         struct tipc_port_list *item;
773         struct sk_buff *b;
774         uint i, last, dst = 0;
775         u32 scope = TIPC_CLUSTER_SCOPE;
776
777         if (in_own_node(msg_orignode(msg)))
778                 scope = TIPC_NODE_SCOPE;
779
780         /* Create destination port list: */
781         tipc_nametbl_mc_translate(msg_nametype(msg),
782                                   msg_namelower(msg),
783                                   msg_nameupper(msg),
784                                   scope,
785                                   &dports);
786         last = dports.count;
787         if (!last) {
788                 kfree_skb(buf);
789                 return;
790         }
791
792         for (item = &dports; item; item = item->next) {
793                 for (i = 0; i < PLSIZE && ++dst <= last; i++) {
794                         b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
795                         if (!b) {
796                                 pr_warn("Failed do clone mcast rcv buffer\n");
797                                 continue;
798                         }
799                         msg_set_destport(msg, item->ports[i]);
800                         tipc_sk_rcv(b);
801                 }
802         }
803         tipc_port_list_free(&dports);
804 }
805
806 /**
807  * tipc_sk_proto_rcv - receive a connection mng protocol message
808  * @tsk: receiving socket
809  * @dnode: node to send response message to, if any
810  * @buf: buffer containing protocol message
811  * Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if
812  * (CONN_PROBE_REPLY) message should be forwarded.
813  */
814 static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode,
815                              struct sk_buff *buf)
816 {
817         struct tipc_msg *msg = buf_msg(buf);
818         int conn_cong;
819
820         /* Ignore if connection cannot be validated: */
821         if (!tsk_peer_msg(tsk, msg))
822                 goto exit;
823
824         tsk->probing_state = TIPC_CONN_OK;
825
826         if (msg_type(msg) == CONN_ACK) {
827                 conn_cong = tsk_conn_cong(tsk);
828                 tsk->sent_unacked -= msg_msgcnt(msg);
829                 if (conn_cong)
830                         tsk->sk.sk_write_space(&tsk->sk);
831         } else if (msg_type(msg) == CONN_PROBE) {
832                 if (!tipc_msg_reverse(buf, dnode, TIPC_OK))
833                         return TIPC_OK;
834                 msg_set_type(msg, CONN_PROBE_REPLY);
835                 return TIPC_FWD_MSG;
836         }
837         /* Do nothing if msg_type() == CONN_PROBE_REPLY */
838 exit:
839         kfree_skb(buf);
840         return TIPC_OK;
841 }
842
843 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
844 {
845         struct sock *sk = sock->sk;
846         struct tipc_sock *tsk = tipc_sk(sk);
847         DEFINE_WAIT(wait);
848         int done;
849
850         do {
851                 int err = sock_error(sk);
852                 if (err)
853                         return err;
854                 if (sock->state == SS_DISCONNECTING)
855                         return -EPIPE;
856                 if (!*timeo_p)
857                         return -EAGAIN;
858                 if (signal_pending(current))
859                         return sock_intr_errno(*timeo_p);
860
861                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
862                 done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
863                 finish_wait(sk_sleep(sk), &wait);
864         } while (!done);
865         return 0;
866 }
867
868 /**
869  * tipc_sendmsg - send message in connectionless manner
870  * @iocb: if NULL, indicates that socket lock is already held
871  * @sock: socket structure
872  * @m: message to send
873  * @dsz: amount of user data to be sent
874  *
875  * Message must have an destination specified explicitly.
876  * Used for SOCK_RDM and SOCK_DGRAM messages,
877  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
878  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
879  *
880  * Returns the number of bytes sent on success, or errno otherwise
881  */
882 static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
883                         struct msghdr *m, size_t dsz)
884 {
885         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
886         struct sock *sk = sock->sk;
887         struct tipc_sock *tsk = tipc_sk(sk);
888         struct tipc_msg *mhdr = &tsk->phdr;
889         u32 dnode, dport;
890         struct sk_buff_head head;
891         struct sk_buff *skb;
892         struct tipc_name_seq *seq = &dest->addr.nameseq;
893         u32 mtu;
894         long timeo;
895         int rc;
896
897         if (unlikely(!dest))
898                 return -EDESTADDRREQ;
899
900         if (unlikely((m->msg_namelen < sizeof(*dest)) ||
901                      (dest->family != AF_TIPC)))
902                 return -EINVAL;
903
904         if (dsz > TIPC_MAX_USER_MSG_SIZE)
905                 return -EMSGSIZE;
906
907         if (iocb)
908                 lock_sock(sk);
909
910         if (unlikely(sock->state != SS_READY)) {
911                 if (sock->state == SS_LISTENING) {
912                         rc = -EPIPE;
913                         goto exit;
914                 }
915                 if (sock->state != SS_UNCONNECTED) {
916                         rc = -EISCONN;
917                         goto exit;
918                 }
919                 if (tsk->published) {
920                         rc = -EOPNOTSUPP;
921                         goto exit;
922                 }
923                 if (dest->addrtype == TIPC_ADDR_NAME) {
924                         tsk->conn_type = dest->addr.name.name.type;
925                         tsk->conn_instance = dest->addr.name.name.instance;
926                 }
927         }
928
929         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
930
931         if (dest->addrtype == TIPC_ADDR_MCAST) {
932                 rc = tipc_sendmcast(sock, seq, m, dsz, timeo);
933                 goto exit;
934         } else if (dest->addrtype == TIPC_ADDR_NAME) {
935                 u32 type = dest->addr.name.name.type;
936                 u32 inst = dest->addr.name.name.instance;
937                 u32 domain = dest->addr.name.domain;
938
939                 dnode = domain;
940                 msg_set_type(mhdr, TIPC_NAMED_MSG);
941                 msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
942                 msg_set_nametype(mhdr, type);
943                 msg_set_nameinst(mhdr, inst);
944                 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
945                 dport = tipc_nametbl_translate(type, inst, &dnode);
946                 msg_set_destnode(mhdr, dnode);
947                 msg_set_destport(mhdr, dport);
948                 if (unlikely(!dport && !dnode)) {
949                         rc = -EHOSTUNREACH;
950                         goto exit;
951                 }
952         } else if (dest->addrtype == TIPC_ADDR_ID) {
953                 dnode = dest->addr.id.node;
954                 msg_set_type(mhdr, TIPC_DIRECT_MSG);
955                 msg_set_lookup_scope(mhdr, 0);
956                 msg_set_destnode(mhdr, dnode);
957                 msg_set_destport(mhdr, dest->addr.id.ref);
958                 msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
959         }
960
961 new_mtu:
962         mtu = tipc_node_get_mtu(dnode, tsk->portid);
963         __skb_queue_head_init(&head);
964         rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &head);
965         if (rc < 0)
966                 goto exit;
967
968         do {
969                 skb = skb_peek(&head);
970                 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
971                 rc = tipc_link_xmit(&head, dnode, tsk->portid);
972                 if (likely(rc >= 0)) {
973                         if (sock->state != SS_READY)
974                                 sock->state = SS_CONNECTING;
975                         rc = dsz;
976                         break;
977                 }
978                 if (rc == -EMSGSIZE)
979                         goto new_mtu;
980                 if (rc != -ELINKCONG)
981                         break;
982                 tsk->link_cong = 1;
983                 rc = tipc_wait_for_sndmsg(sock, &timeo);
984                 if (rc)
985                         __skb_queue_purge(&head);
986         } while (!rc);
987 exit:
988         if (iocb)
989                 release_sock(sk);
990
991         return rc;
992 }
993
994 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
995 {
996         struct sock *sk = sock->sk;
997         struct tipc_sock *tsk = tipc_sk(sk);
998         DEFINE_WAIT(wait);
999         int done;
1000
1001         do {
1002                 int err = sock_error(sk);
1003                 if (err)
1004                         return err;
1005                 if (sock->state == SS_DISCONNECTING)
1006                         return -EPIPE;
1007                 else if (sock->state != SS_CONNECTED)
1008                         return -ENOTCONN;
1009                 if (!*timeo_p)
1010                         return -EAGAIN;
1011                 if (signal_pending(current))
1012                         return sock_intr_errno(*timeo_p);
1013
1014                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1015                 done = sk_wait_event(sk, timeo_p,
1016                                      (!tsk->link_cong &&
1017                                       !tsk_conn_cong(tsk)) ||
1018                                      !tsk->connected);
1019                 finish_wait(sk_sleep(sk), &wait);
1020         } while (!done);
1021         return 0;
1022 }
1023
1024 /**
1025  * tipc_send_stream - send stream-oriented data
1026  * @iocb: (unused)
1027  * @sock: socket structure
1028  * @m: data to send
1029  * @dsz: total length of data to be transmitted
1030  *
1031  * Used for SOCK_STREAM data.
1032  *
1033  * Returns the number of bytes sent on success (or partial success),
1034  * or errno if no data sent
1035  */
1036 static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
1037                             struct msghdr *m, size_t dsz)
1038 {
1039         struct sock *sk = sock->sk;
1040         struct tipc_sock *tsk = tipc_sk(sk);
1041         struct tipc_msg *mhdr = &tsk->phdr;
1042         struct sk_buff_head head;
1043         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1044         u32 portid = tsk->portid;
1045         int rc = -EINVAL;
1046         long timeo;
1047         u32 dnode;
1048         uint mtu, send, sent = 0;
1049
1050         /* Handle implied connection establishment */
1051         if (unlikely(dest)) {
1052                 rc = tipc_sendmsg(iocb, sock, m, dsz);
1053                 if (dsz && (dsz == rc))
1054                         tsk->sent_unacked = 1;
1055                 return rc;
1056         }
1057         if (dsz > (uint)INT_MAX)
1058                 return -EMSGSIZE;
1059
1060         if (iocb)
1061                 lock_sock(sk);
1062
1063         if (unlikely(sock->state != SS_CONNECTED)) {
1064                 if (sock->state == SS_DISCONNECTING)
1065                         rc = -EPIPE;
1066                 else
1067                         rc = -ENOTCONN;
1068                 goto exit;
1069         }
1070
1071         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1072         dnode = tsk_peer_node(tsk);
1073
1074 next:
1075         mtu = tsk->max_pkt;
1076         send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1077         __skb_queue_head_init(&head);
1078         rc = tipc_msg_build(mhdr, m, sent, send, mtu, &head);
1079         if (unlikely(rc < 0))
1080                 goto exit;
1081         do {
1082                 if (likely(!tsk_conn_cong(tsk))) {
1083                         rc = tipc_link_xmit(&head, dnode, portid);
1084                         if (likely(!rc)) {
1085                                 tsk->sent_unacked++;
1086                                 sent += send;
1087                                 if (sent == dsz)
1088                                         break;
1089                                 goto next;
1090                         }
1091                         if (rc == -EMSGSIZE) {
1092                                 tsk->max_pkt = tipc_node_get_mtu(dnode, portid);
1093                                 goto next;
1094                         }
1095                         if (rc != -ELINKCONG)
1096                                 break;
1097                         tsk->link_cong = 1;
1098                 }
1099                 rc = tipc_wait_for_sndpkt(sock, &timeo);
1100                 if (rc)
1101                         __skb_queue_purge(&head);
1102         } while (!rc);
1103 exit:
1104         if (iocb)
1105                 release_sock(sk);
1106         return sent ? sent : rc;
1107 }
1108
1109 /**
1110  * tipc_send_packet - send a connection-oriented message
1111  * @iocb: if NULL, indicates that socket lock is already held
1112  * @sock: socket structure
1113  * @m: message to send
1114  * @dsz: length of data to be transmitted
1115  *
1116  * Used for SOCK_SEQPACKET messages.
1117  *
1118  * Returns the number of bytes sent on success, or errno otherwise
1119  */
1120 static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
1121                             struct msghdr *m, size_t dsz)
1122 {
1123         if (dsz > TIPC_MAX_USER_MSG_SIZE)
1124                 return -EMSGSIZE;
1125
1126         return tipc_send_stream(iocb, sock, m, dsz);
1127 }
1128
1129 /* tipc_sk_finish_conn - complete the setup of a connection
1130  */
1131 static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1132                                 u32 peer_node)
1133 {
1134         struct tipc_msg *msg = &tsk->phdr;
1135
1136         msg_set_destnode(msg, peer_node);
1137         msg_set_destport(msg, peer_port);
1138         msg_set_type(msg, TIPC_CONN_MSG);
1139         msg_set_lookup_scope(msg, 0);
1140         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1141
1142         tsk->probing_interval = CONN_PROBING_INTERVAL;
1143         tsk->probing_state = TIPC_CONN_OK;
1144         tsk->connected = 1;
1145         k_start_timer(&tsk->timer, tsk->probing_interval);
1146         tipc_node_add_conn(peer_node, tsk->portid, peer_port);
1147         tsk->max_pkt = tipc_node_get_mtu(peer_node, tsk->portid);
1148 }
1149
1150 /**
1151  * set_orig_addr - capture sender's address for received message
1152  * @m: descriptor for message info
1153  * @msg: received message header
1154  *
1155  * Note: Address is not captured if not requested by receiver.
1156  */
1157 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
1158 {
1159         DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
1160
1161         if (addr) {
1162                 addr->family = AF_TIPC;
1163                 addr->addrtype = TIPC_ADDR_ID;
1164                 memset(&addr->addr, 0, sizeof(addr->addr));
1165                 addr->addr.id.ref = msg_origport(msg);
1166                 addr->addr.id.node = msg_orignode(msg);
1167                 addr->addr.name.domain = 0;     /* could leave uninitialized */
1168                 addr->scope = 0;                /* could leave uninitialized */
1169                 m->msg_namelen = sizeof(struct sockaddr_tipc);
1170         }
1171 }
1172
1173 /**
1174  * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1175  * @m: descriptor for message info
1176  * @msg: received message header
1177  * @tsk: TIPC port associated with message
1178  *
1179  * Note: Ancillary data is not captured if not requested by receiver.
1180  *
1181  * Returns 0 if successful, otherwise errno
1182  */
1183 static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1184                                  struct tipc_sock *tsk)
1185 {
1186         u32 anc_data[3];
1187         u32 err;
1188         u32 dest_type;
1189         int has_name;
1190         int res;
1191
1192         if (likely(m->msg_controllen == 0))
1193                 return 0;
1194
1195         /* Optionally capture errored message object(s) */
1196         err = msg ? msg_errcode(msg) : 0;
1197         if (unlikely(err)) {
1198                 anc_data[0] = err;
1199                 anc_data[1] = msg_data_sz(msg);
1200                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
1201                 if (res)
1202                         return res;
1203                 if (anc_data[1]) {
1204                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
1205                                        msg_data(msg));
1206                         if (res)
1207                                 return res;
1208                 }
1209         }
1210
1211         /* Optionally capture message destination object */
1212         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
1213         switch (dest_type) {
1214         case TIPC_NAMED_MSG:
1215                 has_name = 1;
1216                 anc_data[0] = msg_nametype(msg);
1217                 anc_data[1] = msg_namelower(msg);
1218                 anc_data[2] = msg_namelower(msg);
1219                 break;
1220         case TIPC_MCAST_MSG:
1221                 has_name = 1;
1222                 anc_data[0] = msg_nametype(msg);
1223                 anc_data[1] = msg_namelower(msg);
1224                 anc_data[2] = msg_nameupper(msg);
1225                 break;
1226         case TIPC_CONN_MSG:
1227                 has_name = (tsk->conn_type != 0);
1228                 anc_data[0] = tsk->conn_type;
1229                 anc_data[1] = tsk->conn_instance;
1230                 anc_data[2] = tsk->conn_instance;
1231                 break;
1232         default:
1233                 has_name = 0;
1234         }
1235         if (has_name) {
1236                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
1237                 if (res)
1238                         return res;
1239         }
1240
1241         return 0;
1242 }
1243
1244 static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1245 {
1246         struct sk_buff *skb = NULL;
1247         struct tipc_msg *msg;
1248         u32 peer_port = tsk_peer_port(tsk);
1249         u32 dnode = tsk_peer_node(tsk);
1250
1251         if (!tsk->connected)
1252                 return;
1253         skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode,
1254                               tipc_own_addr, peer_port, tsk->portid, TIPC_OK);
1255         if (!skb)
1256                 return;
1257         msg = buf_msg(skb);
1258         msg_set_msgcnt(msg, ack);
1259         tipc_link_xmit_skb(skb, dnode, msg_link_selector(msg));
1260 }
1261
1262 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1263 {
1264         struct sock *sk = sock->sk;
1265         DEFINE_WAIT(wait);
1266         long timeo = *timeop;
1267         int err;
1268
1269         for (;;) {
1270                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1271                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1272                         if (sock->state == SS_DISCONNECTING) {
1273                                 err = -ENOTCONN;
1274                                 break;
1275                         }
1276                         release_sock(sk);
1277                         timeo = schedule_timeout(timeo);
1278                         lock_sock(sk);
1279                 }
1280                 err = 0;
1281                 if (!skb_queue_empty(&sk->sk_receive_queue))
1282                         break;
1283                 err = sock_intr_errno(timeo);
1284                 if (signal_pending(current))
1285                         break;
1286                 err = -EAGAIN;
1287                 if (!timeo)
1288                         break;
1289         }
1290         finish_wait(sk_sleep(sk), &wait);
1291         *timeop = timeo;
1292         return err;
1293 }
1294
1295 /**
1296  * tipc_recvmsg - receive packet-oriented message
1297  * @iocb: (unused)
1298  * @m: descriptor for message info
1299  * @buf_len: total size of user buffer area
1300  * @flags: receive flags
1301  *
1302  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1303  * If the complete message doesn't fit in user area, truncate it.
1304  *
1305  * Returns size of returned message data, errno otherwise
1306  */
1307 static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock,
1308                         struct msghdr *m, size_t buf_len, int flags)
1309 {
1310         struct sock *sk = sock->sk;
1311         struct tipc_sock *tsk = tipc_sk(sk);
1312         struct sk_buff *buf;
1313         struct tipc_msg *msg;
1314         long timeo;
1315         unsigned int sz;
1316         u32 err;
1317         int res;
1318
1319         /* Catch invalid receive requests */
1320         if (unlikely(!buf_len))
1321                 return -EINVAL;
1322
1323         lock_sock(sk);
1324
1325         if (unlikely(sock->state == SS_UNCONNECTED)) {
1326                 res = -ENOTCONN;
1327                 goto exit;
1328         }
1329
1330         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1331 restart:
1332
1333         /* Look for a message in receive queue; wait if necessary */
1334         res = tipc_wait_for_rcvmsg(sock, &timeo);
1335         if (res)
1336                 goto exit;
1337
1338         /* Look at first message in receive queue */
1339         buf = skb_peek(&sk->sk_receive_queue);
1340         msg = buf_msg(buf);
1341         sz = msg_data_sz(msg);
1342         err = msg_errcode(msg);
1343
1344         /* Discard an empty non-errored message & try again */
1345         if ((!sz) && (!err)) {
1346                 tsk_advance_rx_queue(sk);
1347                 goto restart;
1348         }
1349
1350         /* Capture sender's address (optional) */
1351         set_orig_addr(m, msg);
1352
1353         /* Capture ancillary data (optional) */
1354         res = tipc_sk_anc_data_recv(m, msg, tsk);
1355         if (res)
1356                 goto exit;
1357
1358         /* Capture message data (if valid) & compute return value (always) */
1359         if (!err) {
1360                 if (unlikely(buf_len < sz)) {
1361                         sz = buf_len;
1362                         m->msg_flags |= MSG_TRUNC;
1363                 }
1364                 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
1365                 if (res)
1366                         goto exit;
1367                 res = sz;
1368         } else {
1369                 if ((sock->state == SS_READY) ||
1370                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1371                         res = 0;
1372                 else
1373                         res = -ECONNRESET;
1374         }
1375
1376         /* Consume received message (optional) */
1377         if (likely(!(flags & MSG_PEEK))) {
1378                 if ((sock->state != SS_READY) &&
1379                     (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1380                         tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1381                         tsk->rcv_unacked = 0;
1382                 }
1383                 tsk_advance_rx_queue(sk);
1384         }
1385 exit:
1386         release_sock(sk);
1387         return res;
1388 }
1389
1390 /**
1391  * tipc_recv_stream - receive stream-oriented data
1392  * @iocb: (unused)
1393  * @m: descriptor for message info
1394  * @buf_len: total size of user buffer area
1395  * @flags: receive flags
1396  *
1397  * Used for SOCK_STREAM messages only.  If not enough data is available
1398  * will optionally wait for more; never truncates data.
1399  *
1400  * Returns size of returned message data, errno otherwise
1401  */
1402 static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock,
1403                             struct msghdr *m, size_t buf_len, int flags)
1404 {
1405         struct sock *sk = sock->sk;
1406         struct tipc_sock *tsk = tipc_sk(sk);
1407         struct sk_buff *buf;
1408         struct tipc_msg *msg;
1409         long timeo;
1410         unsigned int sz;
1411         int sz_to_copy, target, needed;
1412         int sz_copied = 0;
1413         u32 err;
1414         int res = 0;
1415
1416         /* Catch invalid receive attempts */
1417         if (unlikely(!buf_len))
1418                 return -EINVAL;
1419
1420         lock_sock(sk);
1421
1422         if (unlikely(sock->state == SS_UNCONNECTED)) {
1423                 res = -ENOTCONN;
1424                 goto exit;
1425         }
1426
1427         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1428         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1429
1430 restart:
1431         /* Look for a message in receive queue; wait if necessary */
1432         res = tipc_wait_for_rcvmsg(sock, &timeo);
1433         if (res)
1434                 goto exit;
1435
1436         /* Look at first message in receive queue */
1437         buf = skb_peek(&sk->sk_receive_queue);
1438         msg = buf_msg(buf);
1439         sz = msg_data_sz(msg);
1440         err = msg_errcode(msg);
1441
1442         /* Discard an empty non-errored message & try again */
1443         if ((!sz) && (!err)) {
1444                 tsk_advance_rx_queue(sk);
1445                 goto restart;
1446         }
1447
1448         /* Optionally capture sender's address & ancillary data of first msg */
1449         if (sz_copied == 0) {
1450                 set_orig_addr(m, msg);
1451                 res = tipc_sk_anc_data_recv(m, msg, tsk);
1452                 if (res)
1453                         goto exit;
1454         }
1455
1456         /* Capture message data (if valid) & compute return value (always) */
1457         if (!err) {
1458                 u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1459
1460                 sz -= offset;
1461                 needed = (buf_len - sz_copied);
1462                 sz_to_copy = (sz <= needed) ? sz : needed;
1463
1464                 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
1465                                             m, sz_to_copy);
1466                 if (res)
1467                         goto exit;
1468
1469                 sz_copied += sz_to_copy;
1470
1471                 if (sz_to_copy < sz) {
1472                         if (!(flags & MSG_PEEK))
1473                                 TIPC_SKB_CB(buf)->handle =
1474                                 (void *)(unsigned long)(offset + sz_to_copy);
1475                         goto exit;
1476                 }
1477         } else {
1478                 if (sz_copied != 0)
1479                         goto exit; /* can't add error msg to valid data */
1480
1481                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1482                         res = 0;
1483                 else
1484                         res = -ECONNRESET;
1485         }
1486
1487         /* Consume received message (optional) */
1488         if (likely(!(flags & MSG_PEEK))) {
1489                 if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1490                         tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1491                         tsk->rcv_unacked = 0;
1492                 }
1493                 tsk_advance_rx_queue(sk);
1494         }
1495
1496         /* Loop around if more data is required */
1497         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1498             (!skb_queue_empty(&sk->sk_receive_queue) ||
1499             (sz_copied < target)) &&    /* and more is ready or required */
1500             (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1501             (!err))                     /* and haven't reached a FIN */
1502                 goto restart;
1503
1504 exit:
1505         release_sock(sk);
1506         return sz_copied ? sz_copied : res;
1507 }
1508
1509 /**
1510  * tipc_write_space - wake up thread if port congestion is released
1511  * @sk: socket
1512  */
1513 static void tipc_write_space(struct sock *sk)
1514 {
1515         struct socket_wq *wq;
1516
1517         rcu_read_lock();
1518         wq = rcu_dereference(sk->sk_wq);
1519         if (wq_has_sleeper(wq))
1520                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1521                                                 POLLWRNORM | POLLWRBAND);
1522         rcu_read_unlock();
1523 }
1524
1525 /**
1526  * tipc_data_ready - wake up threads to indicate messages have been received
1527  * @sk: socket
1528  * @len: the length of messages
1529  */
1530 static void tipc_data_ready(struct sock *sk)
1531 {
1532         struct socket_wq *wq;
1533
1534         rcu_read_lock();
1535         wq = rcu_dereference(sk->sk_wq);
1536         if (wq_has_sleeper(wq))
1537                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1538                                                 POLLRDNORM | POLLRDBAND);
1539         rcu_read_unlock();
1540 }
1541
1542 /**
1543  * filter_connect - Handle all incoming messages for a connection-based socket
1544  * @tsk: TIPC socket
1545  * @msg: message
1546  *
1547  * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
1548  */
1549 static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
1550 {
1551         struct sock *sk = &tsk->sk;
1552         struct socket *sock = sk->sk_socket;
1553         struct tipc_msg *msg = buf_msg(*buf);
1554         int retval = -TIPC_ERR_NO_PORT;
1555
1556         if (msg_mcast(msg))
1557                 return retval;
1558
1559         switch ((int)sock->state) {
1560         case SS_CONNECTED:
1561                 /* Accept only connection-based messages sent by peer */
1562                 if (tsk_peer_msg(tsk, msg)) {
1563                         if (unlikely(msg_errcode(msg))) {
1564                                 sock->state = SS_DISCONNECTING;
1565                                 tsk->connected = 0;
1566                                 /* let timer expire on it's own */
1567                                 tipc_node_remove_conn(tsk_peer_node(tsk),
1568                                                       tsk->portid);
1569                         }
1570                         retval = TIPC_OK;
1571                 }
1572                 break;
1573         case SS_CONNECTING:
1574                 /* Accept only ACK or NACK message */
1575
1576                 if (unlikely(!msg_connected(msg)))
1577                         break;
1578
1579                 if (unlikely(msg_errcode(msg))) {
1580                         sock->state = SS_DISCONNECTING;
1581                         sk->sk_err = ECONNREFUSED;
1582                         retval = TIPC_OK;
1583                         break;
1584                 }
1585
1586                 if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) {
1587                         sock->state = SS_DISCONNECTING;
1588                         sk->sk_err = EINVAL;
1589                         retval = TIPC_OK;
1590                         break;
1591                 }
1592
1593                 tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg));
1594                 msg_set_importance(&tsk->phdr, msg_importance(msg));
1595                 sock->state = SS_CONNECTED;
1596
1597                 /* If an incoming message is an 'ACK-', it should be
1598                  * discarded here because it doesn't contain useful
1599                  * data. In addition, we should try to wake up
1600                  * connect() routine if sleeping.
1601                  */
1602                 if (msg_data_sz(msg) == 0) {
1603                         kfree_skb(*buf);
1604                         *buf = NULL;
1605                         if (waitqueue_active(sk_sleep(sk)))
1606                                 wake_up_interruptible(sk_sleep(sk));
1607                 }
1608                 retval = TIPC_OK;
1609                 break;
1610         case SS_LISTENING:
1611         case SS_UNCONNECTED:
1612                 /* Accept only SYN message */
1613                 if (!msg_connected(msg) && !(msg_errcode(msg)))
1614                         retval = TIPC_OK;
1615                 break;
1616         case SS_DISCONNECTING:
1617                 break;
1618         default:
1619                 pr_err("Unknown socket state %u\n", sock->state);
1620         }
1621         return retval;
1622 }
1623
1624 /**
1625  * rcvbuf_limit - get proper overload limit of socket receive queue
1626  * @sk: socket
1627  * @buf: message
1628  *
1629  * For all connection oriented messages, irrespective of importance,
1630  * the default overload value (i.e. 67MB) is set as limit.
1631  *
1632  * For all connectionless messages, by default new queue limits are
1633  * as belows:
1634  *
1635  * TIPC_LOW_IMPORTANCE       (4 MB)
1636  * TIPC_MEDIUM_IMPORTANCE    (8 MB)
1637  * TIPC_HIGH_IMPORTANCE      (16 MB)
1638  * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1639  *
1640  * Returns overload limit according to corresponding message importance
1641  */
1642 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1643 {
1644         struct tipc_msg *msg = buf_msg(buf);
1645
1646         if (msg_connected(msg))
1647                 return sysctl_tipc_rmem[2];
1648
1649         return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1650                 msg_importance(msg);
1651 }
1652
1653 /**
1654  * filter_rcv - validate incoming message
1655  * @sk: socket
1656  * @buf: message
1657  *
1658  * Enqueues message on receive queue if acceptable; optionally handles
1659  * disconnect indication for a connected socket.
1660  *
1661  * Called with socket lock already taken; port lock may also be taken.
1662  *
1663  * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message
1664  * to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded
1665  */
1666 static int filter_rcv(struct sock *sk, struct sk_buff *buf)
1667 {
1668         struct socket *sock = sk->sk_socket;
1669         struct tipc_sock *tsk = tipc_sk(sk);
1670         struct tipc_msg *msg = buf_msg(buf);
1671         unsigned int limit = rcvbuf_limit(sk, buf);
1672         u32 onode;
1673         int rc = TIPC_OK;
1674
1675         if (unlikely(msg_user(msg) == CONN_MANAGER))
1676                 return tipc_sk_proto_rcv(tsk, &onode, buf);
1677
1678         if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
1679                 kfree_skb(buf);
1680                 tsk->link_cong = 0;
1681                 sk->sk_write_space(sk);
1682                 return TIPC_OK;
1683         }
1684
1685         /* Reject message if it is wrong sort of message for socket */
1686         if (msg_type(msg) > TIPC_DIRECT_MSG)
1687                 return -TIPC_ERR_NO_PORT;
1688
1689         if (sock->state == SS_READY) {
1690                 if (msg_connected(msg))
1691                         return -TIPC_ERR_NO_PORT;
1692         } else {
1693                 rc = filter_connect(tsk, &buf);
1694                 if (rc != TIPC_OK || buf == NULL)
1695                         return rc;
1696         }
1697
1698         /* Reject message if there isn't room to queue it */
1699         if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1700                 return -TIPC_ERR_OVERLOAD;
1701
1702         /* Enqueue message */
1703         TIPC_SKB_CB(buf)->handle = NULL;
1704         __skb_queue_tail(&sk->sk_receive_queue, buf);
1705         skb_set_owner_r(buf, sk);
1706
1707         sk->sk_data_ready(sk);
1708         return TIPC_OK;
1709 }
1710
1711 /**
1712  * tipc_backlog_rcv - handle incoming message from backlog queue
1713  * @sk: socket
1714  * @skb: message
1715  *
1716  * Caller must hold socket lock, but not port lock.
1717  *
1718  * Returns 0
1719  */
1720 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1721 {
1722         int rc;
1723         u32 onode;
1724         struct tipc_sock *tsk = tipc_sk(sk);
1725         uint truesize = skb->truesize;
1726
1727         rc = filter_rcv(sk, skb);
1728
1729         if (likely(!rc)) {
1730                 if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
1731                         atomic_add(truesize, &tsk->dupl_rcvcnt);
1732                 return 0;
1733         }
1734
1735         if ((rc < 0) && !tipc_msg_reverse(skb, &onode, -rc))
1736                 return 0;
1737
1738         tipc_link_xmit_skb(skb, onode, 0);
1739
1740         return 0;
1741 }
1742
1743 /**
1744  * tipc_sk_rcv - handle incoming message
1745  * @skb: buffer containing arriving message
1746  * Consumes buffer
1747  * Returns 0 if success, or errno: -EHOSTUNREACH
1748  */
1749 int tipc_sk_rcv(struct sk_buff *skb)
1750 {
1751         struct tipc_sock *tsk;
1752         struct sock *sk;
1753         u32 dport = msg_destport(buf_msg(skb));
1754         int rc = TIPC_OK;
1755         uint limit;
1756         u32 dnode;
1757
1758         /* Validate destination and message */
1759         tsk = tipc_sk_lookup(dport);
1760         if (unlikely(!tsk)) {
1761                 rc = tipc_msg_eval(skb, &dnode);
1762                 goto exit;
1763         }
1764         sk = &tsk->sk;
1765
1766         /* Queue message */
1767         spin_lock_bh(&sk->sk_lock.slock);
1768
1769         if (!sock_owned_by_user(sk)) {
1770                 rc = filter_rcv(sk, skb);
1771         } else {
1772                 if (sk->sk_backlog.len == 0)
1773                         atomic_set(&tsk->dupl_rcvcnt, 0);
1774                 limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt);
1775                 if (sk_add_backlog(sk, skb, limit))
1776                         rc = -TIPC_ERR_OVERLOAD;
1777         }
1778         spin_unlock_bh(&sk->sk_lock.slock);
1779         sock_put(sk);
1780         if (likely(!rc))
1781                 return 0;
1782 exit:
1783         if ((rc < 0) && !tipc_msg_reverse(skb, &dnode, -rc))
1784                 return -EHOSTUNREACH;
1785
1786         tipc_link_xmit_skb(skb, dnode, 0);
1787         return (rc < 0) ? -EHOSTUNREACH : 0;
1788 }
1789
1790 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1791 {
1792         struct sock *sk = sock->sk;
1793         DEFINE_WAIT(wait);
1794         int done;
1795
1796         do {
1797                 int err = sock_error(sk);
1798                 if (err)
1799                         return err;
1800                 if (!*timeo_p)
1801                         return -ETIMEDOUT;
1802                 if (signal_pending(current))
1803                         return sock_intr_errno(*timeo_p);
1804
1805                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1806                 done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1807                 finish_wait(sk_sleep(sk), &wait);
1808         } while (!done);
1809         return 0;
1810 }
1811
1812 /**
1813  * tipc_connect - establish a connection to another TIPC port
1814  * @sock: socket structure
1815  * @dest: socket address for destination port
1816  * @destlen: size of socket address data structure
1817  * @flags: file-related flags associated with socket
1818  *
1819  * Returns 0 on success, errno otherwise
1820  */
1821 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1822                         int destlen, int flags)
1823 {
1824         struct sock *sk = sock->sk;
1825         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1826         struct msghdr m = {NULL,};
1827         long timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1828         socket_state previous;
1829         int res;
1830
1831         lock_sock(sk);
1832
1833         /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1834         if (sock->state == SS_READY) {
1835                 res = -EOPNOTSUPP;
1836                 goto exit;
1837         }
1838
1839         /*
1840          * Reject connection attempt using multicast address
1841          *
1842          * Note: send_msg() validates the rest of the address fields,
1843          *       so there's no need to do it here
1844          */
1845         if (dst->addrtype == TIPC_ADDR_MCAST) {
1846                 res = -EINVAL;
1847                 goto exit;
1848         }
1849
1850         previous = sock->state;
1851         switch (sock->state) {
1852         case SS_UNCONNECTED:
1853                 /* Send a 'SYN-' to destination */
1854                 m.msg_name = dest;
1855                 m.msg_namelen = destlen;
1856
1857                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1858                  * indicate send_msg() is never blocked.
1859                  */
1860                 if (!timeout)
1861                         m.msg_flags = MSG_DONTWAIT;
1862
1863                 res = tipc_sendmsg(NULL, sock, &m, 0);
1864                 if ((res < 0) && (res != -EWOULDBLOCK))
1865                         goto exit;
1866
1867                 /* Just entered SS_CONNECTING state; the only
1868                  * difference is that return value in non-blocking
1869                  * case is EINPROGRESS, rather than EALREADY.
1870                  */
1871                 res = -EINPROGRESS;
1872         case SS_CONNECTING:
1873                 if (previous == SS_CONNECTING)
1874                         res = -EALREADY;
1875                 if (!timeout)
1876                         goto exit;
1877                 timeout = msecs_to_jiffies(timeout);
1878                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1879                 res = tipc_wait_for_connect(sock, &timeout);
1880                 break;
1881         case SS_CONNECTED:
1882                 res = -EISCONN;
1883                 break;
1884         default:
1885                 res = -EINVAL;
1886                 break;
1887         }
1888 exit:
1889         release_sock(sk);
1890         return res;
1891 }
1892
1893 /**
1894  * tipc_listen - allow socket to listen for incoming connections
1895  * @sock: socket structure
1896  * @len: (unused)
1897  *
1898  * Returns 0 on success, errno otherwise
1899  */
1900 static int tipc_listen(struct socket *sock, int len)
1901 {
1902         struct sock *sk = sock->sk;
1903         int res;
1904
1905         lock_sock(sk);
1906
1907         if (sock->state != SS_UNCONNECTED)
1908                 res = -EINVAL;
1909         else {
1910                 sock->state = SS_LISTENING;
1911                 res = 0;
1912         }
1913
1914         release_sock(sk);
1915         return res;
1916 }
1917
1918 static int tipc_wait_for_accept(struct socket *sock, long timeo)
1919 {
1920         struct sock *sk = sock->sk;
1921         DEFINE_WAIT(wait);
1922         int err;
1923
1924         /* True wake-one mechanism for incoming connections: only
1925          * one process gets woken up, not the 'whole herd'.
1926          * Since we do not 'race & poll' for established sockets
1927          * anymore, the common case will execute the loop only once.
1928         */
1929         for (;;) {
1930                 prepare_to_wait_exclusive(sk_sleep(sk), &wait,
1931                                           TASK_INTERRUPTIBLE);
1932                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1933                         release_sock(sk);
1934                         timeo = schedule_timeout(timeo);
1935                         lock_sock(sk);
1936                 }
1937                 err = 0;
1938                 if (!skb_queue_empty(&sk->sk_receive_queue))
1939                         break;
1940                 err = -EINVAL;
1941                 if (sock->state != SS_LISTENING)
1942                         break;
1943                 err = sock_intr_errno(timeo);
1944                 if (signal_pending(current))
1945                         break;
1946                 err = -EAGAIN;
1947                 if (!timeo)
1948                         break;
1949         }
1950         finish_wait(sk_sleep(sk), &wait);
1951         return err;
1952 }
1953
1954 /**
1955  * tipc_accept - wait for connection request
1956  * @sock: listening socket
1957  * @newsock: new socket that is to be connected
1958  * @flags: file-related flags associated with socket
1959  *
1960  * Returns 0 on success, errno otherwise
1961  */
1962 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
1963 {
1964         struct sock *new_sk, *sk = sock->sk;
1965         struct sk_buff *buf;
1966         struct tipc_sock *new_tsock;
1967         struct tipc_msg *msg;
1968         long timeo;
1969         int res;
1970
1971         lock_sock(sk);
1972
1973         if (sock->state != SS_LISTENING) {
1974                 res = -EINVAL;
1975                 goto exit;
1976         }
1977         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
1978         res = tipc_wait_for_accept(sock, timeo);
1979         if (res)
1980                 goto exit;
1981
1982         buf = skb_peek(&sk->sk_receive_queue);
1983
1984         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
1985         if (res)
1986                 goto exit;
1987
1988         new_sk = new_sock->sk;
1989         new_tsock = tipc_sk(new_sk);
1990         msg = buf_msg(buf);
1991
1992         /* we lock on new_sk; but lockdep sees the lock on sk */
1993         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
1994
1995         /*
1996          * Reject any stray messages received by new socket
1997          * before the socket lock was taken (very, very unlikely)
1998          */
1999         tsk_rej_rx_queue(new_sk);
2000
2001         /* Connect new socket to it's peer */
2002         tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2003         new_sock->state = SS_CONNECTED;
2004
2005         tsk_set_importance(new_tsock, msg_importance(msg));
2006         if (msg_named(msg)) {
2007                 new_tsock->conn_type = msg_nametype(msg);
2008                 new_tsock->conn_instance = msg_nameinst(msg);
2009         }
2010
2011         /*
2012          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
2013          * Respond to 'SYN+' by queuing it on new socket.
2014          */
2015         if (!msg_data_sz(msg)) {
2016                 struct msghdr m = {NULL,};
2017
2018                 tsk_advance_rx_queue(sk);
2019                 tipc_send_packet(NULL, new_sock, &m, 0);
2020         } else {
2021                 __skb_dequeue(&sk->sk_receive_queue);
2022                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
2023                 skb_set_owner_r(buf, new_sk);
2024         }
2025         release_sock(new_sk);
2026 exit:
2027         release_sock(sk);
2028         return res;
2029 }
2030
2031 /**
2032  * tipc_shutdown - shutdown socket connection
2033  * @sock: socket structure
2034  * @how: direction to close (must be SHUT_RDWR)
2035  *
2036  * Terminates connection (if necessary), then purges socket's receive queue.
2037  *
2038  * Returns 0 on success, errno otherwise
2039  */
2040 static int tipc_shutdown(struct socket *sock, int how)
2041 {
2042         struct sock *sk = sock->sk;
2043         struct tipc_sock *tsk = tipc_sk(sk);
2044         struct sk_buff *skb;
2045         u32 dnode;
2046         int res;
2047
2048         if (how != SHUT_RDWR)
2049                 return -EINVAL;
2050
2051         lock_sock(sk);
2052
2053         switch (sock->state) {
2054         case SS_CONNECTING:
2055         case SS_CONNECTED:
2056
2057 restart:
2058                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
2059                 skb = __skb_dequeue(&sk->sk_receive_queue);
2060                 if (skb) {
2061                         if (TIPC_SKB_CB(skb)->handle != NULL) {
2062                                 kfree_skb(skb);
2063                                 goto restart;
2064                         }
2065                         if (tipc_msg_reverse(skb, &dnode, TIPC_CONN_SHUTDOWN))
2066                                 tipc_link_xmit_skb(skb, dnode, tsk->portid);
2067                         tipc_node_remove_conn(dnode, tsk->portid);
2068                 } else {
2069                         dnode = tsk_peer_node(tsk);
2070                         skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2071                                               TIPC_CONN_MSG, SHORT_H_SIZE,
2072                                               0, dnode, tipc_own_addr,
2073                                               tsk_peer_port(tsk),
2074                                               tsk->portid, TIPC_CONN_SHUTDOWN);
2075                         tipc_link_xmit_skb(skb, dnode, tsk->portid);
2076                 }
2077                 tsk->connected = 0;
2078                 sock->state = SS_DISCONNECTING;
2079                 tipc_node_remove_conn(dnode, tsk->portid);
2080                 /* fall through */
2081
2082         case SS_DISCONNECTING:
2083
2084                 /* Discard any unreceived messages */
2085                 __skb_queue_purge(&sk->sk_receive_queue);
2086
2087                 /* Wake up anyone sleeping in poll */
2088                 sk->sk_state_change(sk);
2089                 res = 0;
2090                 break;
2091
2092         default:
2093                 res = -ENOTCONN;
2094         }
2095
2096         release_sock(sk);
2097         return res;
2098 }
2099
2100 static void tipc_sk_timeout(unsigned long portid)
2101 {
2102         struct tipc_sock *tsk;
2103         struct sock *sk;
2104         struct sk_buff *skb = NULL;
2105         u32 peer_port, peer_node;
2106
2107         tsk = tipc_sk_lookup(portid);
2108         if (!tsk)
2109                 return;
2110
2111         sk = &tsk->sk;
2112         bh_lock_sock(sk);
2113         if (!tsk->connected) {
2114                 bh_unlock_sock(sk);
2115                 goto exit;
2116         }
2117         peer_port = tsk_peer_port(tsk);
2118         peer_node = tsk_peer_node(tsk);
2119
2120         if (tsk->probing_state == TIPC_CONN_PROBING) {
2121                 /* Previous probe not answered -> self abort */
2122                 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
2123                                       SHORT_H_SIZE, 0, tipc_own_addr,
2124                                       peer_node, portid, peer_port,
2125                                       TIPC_ERR_NO_PORT);
2126         } else {
2127                 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE,
2128                                       0, peer_node, tipc_own_addr,
2129                                       peer_port, portid, TIPC_OK);
2130                 tsk->probing_state = TIPC_CONN_PROBING;
2131                 k_start_timer(&tsk->timer, tsk->probing_interval);
2132         }
2133         bh_unlock_sock(sk);
2134         if (skb)
2135                 tipc_link_xmit_skb(skb, peer_node, portid);
2136 exit:
2137         sock_put(sk);
2138 }
2139
2140 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2141                            struct tipc_name_seq const *seq)
2142 {
2143         struct publication *publ;
2144         u32 key;
2145
2146         if (tsk->connected)
2147                 return -EINVAL;
2148         key = tsk->portid + tsk->pub_count + 1;
2149         if (key == tsk->portid)
2150                 return -EADDRINUSE;
2151
2152         publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
2153                                     scope, tsk->portid, key);
2154         if (unlikely(!publ))
2155                 return -EINVAL;
2156
2157         list_add(&publ->pport_list, &tsk->publications);
2158         tsk->pub_count++;
2159         tsk->published = 1;
2160         return 0;
2161 }
2162
2163 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2164                             struct tipc_name_seq const *seq)
2165 {
2166         struct publication *publ;
2167         struct publication *safe;
2168         int rc = -EINVAL;
2169
2170         list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) {
2171                 if (seq) {
2172                         if (publ->scope != scope)
2173                                 continue;
2174                         if (publ->type != seq->type)
2175                                 continue;
2176                         if (publ->lower != seq->lower)
2177                                 continue;
2178                         if (publ->upper != seq->upper)
2179                                 break;
2180                         tipc_nametbl_withdraw(publ->type, publ->lower,
2181                                               publ->ref, publ->key);
2182                         rc = 0;
2183                         break;
2184                 }
2185                 tipc_nametbl_withdraw(publ->type, publ->lower,
2186                                       publ->ref, publ->key);
2187                 rc = 0;
2188         }
2189         if (list_empty(&tsk->publications))
2190                 tsk->published = 0;
2191         return rc;
2192 }
2193
2194 static int tipc_sk_show(struct tipc_sock *tsk, char *buf,
2195                         int len, int full_id)
2196 {
2197         struct publication *publ;
2198         int ret;
2199
2200         if (full_id)
2201                 ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:",
2202                                     tipc_zone(tipc_own_addr),
2203                                     tipc_cluster(tipc_own_addr),
2204                                     tipc_node(tipc_own_addr), tsk->portid);
2205         else
2206                 ret = tipc_snprintf(buf, len, "%-10u:", tsk->portid);
2207
2208         if (tsk->connected) {
2209                 u32 dport = tsk_peer_port(tsk);
2210                 u32 destnode = tsk_peer_node(tsk);
2211
2212                 ret += tipc_snprintf(buf + ret, len - ret,
2213                                      " connected to <%u.%u.%u:%u>",
2214                                      tipc_zone(destnode),
2215                                      tipc_cluster(destnode),
2216                                      tipc_node(destnode), dport);
2217                 if (tsk->conn_type != 0)
2218                         ret += tipc_snprintf(buf + ret, len - ret,
2219                                              " via {%u,%u}", tsk->conn_type,
2220                                              tsk->conn_instance);
2221         } else if (tsk->published) {
2222                 ret += tipc_snprintf(buf + ret, len - ret, " bound to");
2223                 list_for_each_entry(publ, &tsk->publications, pport_list) {
2224                         if (publ->lower == publ->upper)
2225                                 ret += tipc_snprintf(buf + ret, len - ret,
2226                                                      " {%u,%u}", publ->type,
2227                                                      publ->lower);
2228                         else
2229                                 ret += tipc_snprintf(buf + ret, len - ret,
2230                                                      " {%u,%u,%u}", publ->type,
2231                                                      publ->lower, publ->upper);
2232                 }
2233         }
2234         ret += tipc_snprintf(buf + ret, len - ret, "\n");
2235         return ret;
2236 }
2237
2238 struct sk_buff *tipc_sk_socks_show(void)
2239 {
2240         const struct bucket_table *tbl;
2241         struct rhash_head *pos;
2242         struct sk_buff *buf;
2243         struct tlv_desc *rep_tlv;
2244         char *pb;
2245         int pb_len;
2246         struct tipc_sock *tsk;
2247         int str_len = 0;
2248         int i;
2249
2250         buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
2251         if (!buf)
2252                 return NULL;
2253         rep_tlv = (struct tlv_desc *)buf->data;
2254         pb = TLV_DATA(rep_tlv);
2255         pb_len = ULTRA_STRING_MAX_LEN;
2256
2257         rcu_read_lock();
2258         tbl = rht_dereference_rcu((&tipc_sk_rht)->tbl, &tipc_sk_rht);
2259         for (i = 0; i < tbl->size; i++) {
2260                 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2261                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2262                         str_len += tipc_sk_show(tsk, pb + str_len,
2263                                                 pb_len - str_len, 0);
2264                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2265                 }
2266         }
2267         rcu_read_unlock();
2268
2269         str_len += 1;   /* for "\0" */
2270         skb_put(buf, TLV_SPACE(str_len));
2271         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
2272
2273         return buf;
2274 }
2275
2276 /* tipc_sk_reinit: set non-zero address in all existing sockets
2277  *                 when we go from standalone to network mode.
2278  */
2279 void tipc_sk_reinit(void)
2280 {
2281         const struct bucket_table *tbl;
2282         struct rhash_head *pos;
2283         struct tipc_sock *tsk;
2284         struct tipc_msg *msg;
2285         int i;
2286
2287         rcu_read_lock();
2288         tbl = rht_dereference_rcu((&tipc_sk_rht)->tbl, &tipc_sk_rht);
2289         for (i = 0; i < tbl->size; i++) {
2290                 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2291                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2292                         msg = &tsk->phdr;
2293                         msg_set_prevnode(msg, tipc_own_addr);
2294                         msg_set_orignode(msg, tipc_own_addr);
2295                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2296                 }
2297         }
2298         rcu_read_unlock();
2299 }
2300
2301 static struct tipc_sock *tipc_sk_lookup(u32 portid)
2302 {
2303         struct tipc_sock *tsk;
2304
2305         rcu_read_lock();
2306         tsk = rhashtable_lookup(&tipc_sk_rht, &portid);
2307         if (tsk)
2308                 sock_hold(&tsk->sk);
2309         rcu_read_unlock();
2310
2311         return tsk;
2312 }
2313
2314 static int tipc_sk_insert(struct tipc_sock *tsk)
2315 {
2316         u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
2317         u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2318
2319         while (remaining--) {
2320                 portid++;
2321                 if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
2322                         portid = TIPC_MIN_PORT;
2323                 tsk->portid = portid;
2324                 sock_hold(&tsk->sk);
2325                 if (rhashtable_lookup_insert(&tipc_sk_rht, &tsk->node))
2326                         return 0;
2327                 sock_put(&tsk->sk);
2328         }
2329
2330         return -1;
2331 }
2332
2333 static void tipc_sk_remove(struct tipc_sock *tsk)
2334 {
2335         struct sock *sk = &tsk->sk;
2336
2337         if (rhashtable_remove(&tipc_sk_rht, &tsk->node)) {
2338                 WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
2339                 __sock_put(sk);
2340         }
2341 }
2342
2343 int tipc_sk_rht_init(void)
2344 {
2345         struct rhashtable_params rht_params = {
2346                 .nelem_hint = 192,
2347                 .head_offset = offsetof(struct tipc_sock, node),
2348                 .key_offset = offsetof(struct tipc_sock, portid),
2349                 .key_len = sizeof(u32), /* portid */
2350                 .hashfn = jhash,
2351                 .max_shift = 20, /* 1M */
2352                 .min_shift = 8,  /* 256 */
2353                 .grow_decision = rht_grow_above_75,
2354                 .shrink_decision = rht_shrink_below_30,
2355         };
2356
2357         return rhashtable_init(&tipc_sk_rht, &rht_params);
2358 }
2359
2360 void tipc_sk_rht_destroy(void)
2361 {
2362         /* Wait for socket readers to complete */
2363         synchronize_net();
2364
2365         rhashtable_destroy(&tipc_sk_rht);
2366 }
2367
2368 /**
2369  * tipc_setsockopt - set socket option
2370  * @sock: socket structure
2371  * @lvl: option level
2372  * @opt: option identifier
2373  * @ov: pointer to new option value
2374  * @ol: length of option value
2375  *
2376  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
2377  * (to ease compatibility).
2378  *
2379  * Returns 0 on success, errno otherwise
2380  */
2381 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2382                            char __user *ov, unsigned int ol)
2383 {
2384         struct sock *sk = sock->sk;
2385         struct tipc_sock *tsk = tipc_sk(sk);
2386         u32 value;
2387         int res;
2388
2389         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2390                 return 0;
2391         if (lvl != SOL_TIPC)
2392                 return -ENOPROTOOPT;
2393         if (ol < sizeof(value))
2394                 return -EINVAL;
2395         res = get_user(value, (u32 __user *)ov);
2396         if (res)
2397                 return res;
2398
2399         lock_sock(sk);
2400
2401         switch (opt) {
2402         case TIPC_IMPORTANCE:
2403                 res = tsk_set_importance(tsk, value);
2404                 break;
2405         case TIPC_SRC_DROPPABLE:
2406                 if (sock->type != SOCK_STREAM)
2407                         tsk_set_unreliable(tsk, value);
2408                 else
2409                         res = -ENOPROTOOPT;
2410                 break;
2411         case TIPC_DEST_DROPPABLE:
2412                 tsk_set_unreturnable(tsk, value);
2413                 break;
2414         case TIPC_CONN_TIMEOUT:
2415                 tipc_sk(sk)->conn_timeout = value;
2416                 /* no need to set "res", since already 0 at this point */
2417                 break;
2418         default:
2419                 res = -EINVAL;
2420         }
2421
2422         release_sock(sk);
2423
2424         return res;
2425 }
2426
2427 /**
2428  * tipc_getsockopt - get socket option
2429  * @sock: socket structure
2430  * @lvl: option level
2431  * @opt: option identifier
2432  * @ov: receptacle for option value
2433  * @ol: receptacle for length of option value
2434  *
2435  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
2436  * (to ease compatibility).
2437  *
2438  * Returns 0 on success, errno otherwise
2439  */
2440 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2441                            char __user *ov, int __user *ol)
2442 {
2443         struct sock *sk = sock->sk;
2444         struct tipc_sock *tsk = tipc_sk(sk);
2445         int len;
2446         u32 value;
2447         int res;
2448
2449         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2450                 return put_user(0, ol);
2451         if (lvl != SOL_TIPC)
2452                 return -ENOPROTOOPT;
2453         res = get_user(len, ol);
2454         if (res)
2455                 return res;
2456
2457         lock_sock(sk);
2458
2459         switch (opt) {
2460         case TIPC_IMPORTANCE:
2461                 value = tsk_importance(tsk);
2462                 break;
2463         case TIPC_SRC_DROPPABLE:
2464                 value = tsk_unreliable(tsk);
2465                 break;
2466         case TIPC_DEST_DROPPABLE:
2467                 value = tsk_unreturnable(tsk);
2468                 break;
2469         case TIPC_CONN_TIMEOUT:
2470                 value = tsk->conn_timeout;
2471                 /* no need to set "res", since already 0 at this point */
2472                 break;
2473         case TIPC_NODE_RECVQ_DEPTH:
2474                 value = 0; /* was tipc_queue_size, now obsolete */
2475                 break;
2476         case TIPC_SOCK_RECVQ_DEPTH:
2477                 value = skb_queue_len(&sk->sk_receive_queue);
2478                 break;
2479         default:
2480                 res = -EINVAL;
2481         }
2482
2483         release_sock(sk);
2484
2485         if (res)
2486                 return res;     /* "get" failed */
2487
2488         if (len < sizeof(value))
2489                 return -EINVAL;
2490
2491         if (copy_to_user(ov, &value, sizeof(value)))
2492                 return -EFAULT;
2493
2494         return put_user(sizeof(value), ol);
2495 }
2496
2497 static int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg)
2498 {
2499         struct tipc_sioc_ln_req lnr;
2500         void __user *argp = (void __user *)arg;
2501
2502         switch (cmd) {
2503         case SIOCGETLINKNAME:
2504                 if (copy_from_user(&lnr, argp, sizeof(lnr)))
2505                         return -EFAULT;
2506                 if (!tipc_node_get_linkname(lnr.bearer_id & 0xffff, lnr.peer,
2507                                             lnr.linkname, TIPC_MAX_LINK_NAME)) {
2508                         if (copy_to_user(argp, &lnr, sizeof(lnr)))
2509                                 return -EFAULT;
2510                         return 0;
2511                 }
2512                 return -EADDRNOTAVAIL;
2513         default:
2514                 return -ENOIOCTLCMD;
2515         }
2516 }
2517
2518 /* Protocol switches for the various types of TIPC sockets */
2519
2520 static const struct proto_ops msg_ops = {
2521         .owner          = THIS_MODULE,
2522         .family         = AF_TIPC,
2523         .release        = tipc_release,
2524         .bind           = tipc_bind,
2525         .connect        = tipc_connect,
2526         .socketpair     = sock_no_socketpair,
2527         .accept         = sock_no_accept,
2528         .getname        = tipc_getname,
2529         .poll           = tipc_poll,
2530         .ioctl          = tipc_ioctl,
2531         .listen         = sock_no_listen,
2532         .shutdown       = tipc_shutdown,
2533         .setsockopt     = tipc_setsockopt,
2534         .getsockopt     = tipc_getsockopt,
2535         .sendmsg        = tipc_sendmsg,
2536         .recvmsg        = tipc_recvmsg,
2537         .mmap           = sock_no_mmap,
2538         .sendpage       = sock_no_sendpage
2539 };
2540
2541 static const struct proto_ops packet_ops = {
2542         .owner          = THIS_MODULE,
2543         .family         = AF_TIPC,
2544         .release        = tipc_release,
2545         .bind           = tipc_bind,
2546         .connect        = tipc_connect,
2547         .socketpair     = sock_no_socketpair,
2548         .accept         = tipc_accept,
2549         .getname        = tipc_getname,
2550         .poll           = tipc_poll,
2551         .ioctl          = tipc_ioctl,
2552         .listen         = tipc_listen,
2553         .shutdown       = tipc_shutdown,
2554         .setsockopt     = tipc_setsockopt,
2555         .getsockopt     = tipc_getsockopt,
2556         .sendmsg        = tipc_send_packet,
2557         .recvmsg        = tipc_recvmsg,
2558         .mmap           = sock_no_mmap,
2559         .sendpage       = sock_no_sendpage
2560 };
2561
2562 static const struct proto_ops stream_ops = {
2563         .owner          = THIS_MODULE,
2564         .family         = AF_TIPC,
2565         .release        = tipc_release,
2566         .bind           = tipc_bind,
2567         .connect        = tipc_connect,
2568         .socketpair     = sock_no_socketpair,
2569         .accept         = tipc_accept,
2570         .getname        = tipc_getname,
2571         .poll           = tipc_poll,
2572         .ioctl          = tipc_ioctl,
2573         .listen         = tipc_listen,
2574         .shutdown       = tipc_shutdown,
2575         .setsockopt     = tipc_setsockopt,
2576         .getsockopt     = tipc_getsockopt,
2577         .sendmsg        = tipc_send_stream,
2578         .recvmsg        = tipc_recv_stream,
2579         .mmap           = sock_no_mmap,
2580         .sendpage       = sock_no_sendpage
2581 };
2582
2583 static const struct net_proto_family tipc_family_ops = {
2584         .owner          = THIS_MODULE,
2585         .family         = AF_TIPC,
2586         .create         = tipc_sk_create
2587 };
2588
2589 static struct proto tipc_proto = {
2590         .name           = "TIPC",
2591         .owner          = THIS_MODULE,
2592         .obj_size       = sizeof(struct tipc_sock),
2593         .sysctl_rmem    = sysctl_tipc_rmem
2594 };
2595
2596 static struct proto tipc_proto_kern = {
2597         .name           = "TIPC",
2598         .obj_size       = sizeof(struct tipc_sock),
2599         .sysctl_rmem    = sysctl_tipc_rmem
2600 };
2601
2602 /**
2603  * tipc_socket_init - initialize TIPC socket interface
2604  *
2605  * Returns 0 on success, errno otherwise
2606  */
2607 int tipc_socket_init(void)
2608 {
2609         int res;
2610
2611         res = proto_register(&tipc_proto, 1);
2612         if (res) {
2613                 pr_err("Failed to register TIPC protocol type\n");
2614                 goto out;
2615         }
2616
2617         res = sock_register(&tipc_family_ops);
2618         if (res) {
2619                 pr_err("Failed to register TIPC socket type\n");
2620                 proto_unregister(&tipc_proto);
2621                 goto out;
2622         }
2623  out:
2624         return res;
2625 }
2626
2627 /**
2628  * tipc_socket_stop - stop TIPC socket interface
2629  */
2630 void tipc_socket_stop(void)
2631 {
2632         sock_unregister(tipc_family_ops.family);
2633         proto_unregister(&tipc_proto);
2634 }
2635
2636 /* Caller should hold socket lock for the passed tipc socket. */
2637 static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
2638 {
2639         u32 peer_node;
2640         u32 peer_port;
2641         struct nlattr *nest;
2642
2643         peer_node = tsk_peer_node(tsk);
2644         peer_port = tsk_peer_port(tsk);
2645
2646         nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);
2647
2648         if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
2649                 goto msg_full;
2650         if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
2651                 goto msg_full;
2652
2653         if (tsk->conn_type != 0) {
2654                 if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
2655                         goto msg_full;
2656                 if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
2657                         goto msg_full;
2658                 if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
2659                         goto msg_full;
2660         }
2661         nla_nest_end(skb, nest);
2662
2663         return 0;
2664
2665 msg_full:
2666         nla_nest_cancel(skb, nest);
2667
2668         return -EMSGSIZE;
2669 }
2670
2671 /* Caller should hold socket lock for the passed tipc socket. */
2672 static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
2673                             struct tipc_sock *tsk)
2674 {
2675         int err;
2676         void *hdr;
2677         struct nlattr *attrs;
2678
2679         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2680                           &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
2681         if (!hdr)
2682                 goto msg_cancel;
2683
2684         attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
2685         if (!attrs)
2686                 goto genlmsg_cancel;
2687         if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
2688                 goto attr_msg_cancel;
2689         if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr))
2690                 goto attr_msg_cancel;
2691
2692         if (tsk->connected) {
2693                 err = __tipc_nl_add_sk_con(skb, tsk);
2694                 if (err)
2695                         goto attr_msg_cancel;
2696         } else if (!list_empty(&tsk->publications)) {
2697                 if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
2698                         goto attr_msg_cancel;
2699         }
2700         nla_nest_end(skb, attrs);
2701         genlmsg_end(skb, hdr);
2702
2703         return 0;
2704
2705 attr_msg_cancel:
2706         nla_nest_cancel(skb, attrs);
2707 genlmsg_cancel:
2708         genlmsg_cancel(skb, hdr);
2709 msg_cancel:
2710         return -EMSGSIZE;
2711 }
2712
2713 int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
2714 {
2715         int err;
2716         struct tipc_sock *tsk;
2717         const struct bucket_table *tbl;
2718         struct rhash_head *pos;
2719         u32 prev_portid = cb->args[0];
2720         u32 portid = prev_portid;
2721         int i;
2722
2723         rcu_read_lock();
2724         tbl = rht_dereference_rcu((&tipc_sk_rht)->tbl, &tipc_sk_rht);
2725         for (i = 0; i < tbl->size; i++) {
2726                 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2727                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2728                         portid = tsk->portid;
2729                         err = __tipc_nl_add_sk(skb, cb, tsk);
2730                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2731                         if (err)
2732                                 break;
2733
2734                         prev_portid = portid;
2735                 }
2736         }
2737         rcu_read_unlock();
2738
2739         cb->args[0] = prev_portid;
2740
2741         return skb->len;
2742 }
2743
2744 /* Caller should hold socket lock for the passed tipc socket. */
2745 static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
2746                                  struct netlink_callback *cb,
2747                                  struct publication *publ)
2748 {
2749         void *hdr;
2750         struct nlattr *attrs;
2751
2752         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2753                           &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
2754         if (!hdr)
2755                 goto msg_cancel;
2756
2757         attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
2758         if (!attrs)
2759                 goto genlmsg_cancel;
2760
2761         if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
2762                 goto attr_msg_cancel;
2763         if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
2764                 goto attr_msg_cancel;
2765         if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
2766                 goto attr_msg_cancel;
2767         if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
2768                 goto attr_msg_cancel;
2769
2770         nla_nest_end(skb, attrs);
2771         genlmsg_end(skb, hdr);
2772
2773         return 0;
2774
2775 attr_msg_cancel:
2776         nla_nest_cancel(skb, attrs);
2777 genlmsg_cancel:
2778         genlmsg_cancel(skb, hdr);
2779 msg_cancel:
2780         return -EMSGSIZE;
2781 }
2782
2783 /* Caller should hold socket lock for the passed tipc socket. */
2784 static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
2785                                   struct netlink_callback *cb,
2786                                   struct tipc_sock *tsk, u32 *last_publ)
2787 {
2788         int err;
2789         struct publication *p;
2790
2791         if (*last_publ) {
2792                 list_for_each_entry(p, &tsk->publications, pport_list) {
2793                         if (p->key == *last_publ)
2794                                 break;
2795                 }
2796                 if (p->key != *last_publ) {
2797                         /* We never set seq or call nl_dump_check_consistent()
2798                          * this means that setting prev_seq here will cause the
2799                          * consistence check to fail in the netlink callback
2800                          * handler. Resulting in the last NLMSG_DONE message
2801                          * having the NLM_F_DUMP_INTR flag set.
2802                          */
2803                         cb->prev_seq = 1;
2804                         *last_publ = 0;
2805                         return -EPIPE;
2806                 }
2807         } else {
2808                 p = list_first_entry(&tsk->publications, struct publication,
2809                                      pport_list);
2810         }
2811
2812         list_for_each_entry_from(p, &tsk->publications, pport_list) {
2813                 err = __tipc_nl_add_sk_publ(skb, cb, p);
2814                 if (err) {
2815                         *last_publ = p->key;
2816                         return err;
2817                 }
2818         }
2819         *last_publ = 0;
2820
2821         return 0;
2822 }
2823
2824 int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2825 {
2826         int err;
2827         u32 tsk_portid = cb->args[0];
2828         u32 last_publ = cb->args[1];
2829         u32 done = cb->args[2];
2830         struct tipc_sock *tsk;
2831
2832         if (!tsk_portid) {
2833                 struct nlattr **attrs;
2834                 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
2835
2836                 err = tipc_nlmsg_parse(cb->nlh, &attrs);
2837                 if (err)
2838                         return err;
2839
2840                 err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
2841                                        attrs[TIPC_NLA_SOCK],
2842                                        tipc_nl_sock_policy);
2843                 if (err)
2844                         return err;
2845
2846                 if (!sock[TIPC_NLA_SOCK_REF])
2847                         return -EINVAL;
2848
2849                 tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
2850         }
2851
2852         if (done)
2853                 return 0;
2854
2855         tsk = tipc_sk_lookup(tsk_portid);
2856         if (!tsk)
2857                 return -EINVAL;
2858
2859         lock_sock(&tsk->sk);
2860         err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
2861         if (!err)
2862                 done = 1;
2863         release_sock(&tsk->sk);
2864         sock_put(&tsk->sk);
2865
2866         cb->args[0] = tsk_portid;
2867         cb->args[1] = last_publ;
2868         cb->args[2] = done;
2869
2870         return skb->len;
2871 }