tipc: simplify message forwarding and rejection in socket layer
[firefly-linux-kernel-4.4.55.git] / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2014, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include <linux/rhashtable.h>
38 #include <linux/jhash.h>
39 #include "core.h"
40 #include "name_table.h"
41 #include "node.h"
42 #include "link.h"
43 #include "config.h"
44 #include "socket.h"
45
46 #define SS_LISTENING            -1      /* socket is listening */
47 #define SS_READY                -2      /* socket is connectionless */
48
49 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
50 #define CONN_PROBING_INTERVAL   msecs_to_jiffies(3600000)  /* [ms] => 1 h */
51 #define TIPC_FWD_MSG            1
52 #define TIPC_CONN_OK            0
53 #define TIPC_CONN_PROBING       1
54 #define TIPC_MAX_PORT           0xffffffff
55 #define TIPC_MIN_PORT           1
56
57 /**
58  * struct tipc_sock - TIPC socket structure
59  * @sk: socket - interacts with 'port' and with user via the socket API
60  * @connected: non-zero if port is currently connected to a peer port
61  * @conn_type: TIPC type used when connection was established
62  * @conn_instance: TIPC instance used when connection was established
63  * @published: non-zero if port has one or more associated names
64  * @max_pkt: maximum packet size "hint" used when building messages sent by port
65  * @portid: unique port identity in TIPC socket hash table
66  * @phdr: preformatted message header used when sending messages
67  * @port_list: adjacent ports in TIPC's global list of ports
68  * @publications: list of publications for port
69  * @pub_count: total # of publications port has made during its lifetime
70  * @probing_state:
71  * @probing_intv:
72  * @port: port - interacts with 'sk' and with the rest of the TIPC stack
73  * @peer_name: the peer of the connection, if any
74  * @conn_timeout: the time we can wait for an unresponded setup request
75  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
76  * @link_cong: non-zero if owner must sleep because of link congestion
77  * @sent_unacked: # messages sent by socket, and not yet acked by peer
78  * @rcv_unacked: # messages read by user, but not yet acked back to peer
79  * @node: hash table node
80  * @rcu: rcu struct for tipc_sock
81  */
82 struct tipc_sock {
83         struct sock sk;
84         int connected;
85         u32 conn_type;
86         u32 conn_instance;
87         int published;
88         u32 max_pkt;
89         u32 portid;
90         struct tipc_msg phdr;
91         struct list_head sock_list;
92         struct list_head publications;
93         u32 pub_count;
94         u32 probing_state;
95         unsigned long probing_intv;
96         uint conn_timeout;
97         atomic_t dupl_rcvcnt;
98         bool link_cong;
99         uint sent_unacked;
100         uint rcv_unacked;
101         struct rhash_head node;
102         struct rcu_head rcu;
103 };
104
105 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
106 static void tipc_data_ready(struct sock *sk);
107 static void tipc_write_space(struct sock *sk);
108 static int tipc_release(struct socket *sock);
109 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
110 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
111 static void tipc_sk_timeout(unsigned long data);
112 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
113                            struct tipc_name_seq const *seq);
114 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
115                             struct tipc_name_seq const *seq);
116 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
117 static int tipc_sk_insert(struct tipc_sock *tsk);
118 static void tipc_sk_remove(struct tipc_sock *tsk);
119
120 static const struct proto_ops packet_ops;
121 static const struct proto_ops stream_ops;
122 static const struct proto_ops msg_ops;
123
124 static struct proto tipc_proto;
125 static struct proto tipc_proto_kern;
126
127 static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
128         [TIPC_NLA_SOCK_UNSPEC]          = { .type = NLA_UNSPEC },
129         [TIPC_NLA_SOCK_ADDR]            = { .type = NLA_U32 },
130         [TIPC_NLA_SOCK_REF]             = { .type = NLA_U32 },
131         [TIPC_NLA_SOCK_CON]             = { .type = NLA_NESTED },
132         [TIPC_NLA_SOCK_HAS_PUBL]        = { .type = NLA_FLAG }
133 };
134
135 /*
136  * Revised TIPC socket locking policy:
137  *
138  * Most socket operations take the standard socket lock when they start
139  * and hold it until they finish (or until they need to sleep).  Acquiring
140  * this lock grants the owner exclusive access to the fields of the socket
141  * data structures, with the exception of the backlog queue.  A few socket
142  * operations can be done without taking the socket lock because they only
143  * read socket information that never changes during the life of the socket.
144  *
145  * Socket operations may acquire the lock for the associated TIPC port if they
146  * need to perform an operation on the port.  If any routine needs to acquire
147  * both the socket lock and the port lock it must take the socket lock first
148  * to avoid the risk of deadlock.
149  *
150  * The dispatcher handling incoming messages cannot grab the socket lock in
151  * the standard fashion, since invoked it runs at the BH level and cannot block.
152  * Instead, it checks to see if the socket lock is currently owned by someone,
153  * and either handles the message itself or adds it to the socket's backlog
154  * queue; in the latter case the queued message is processed once the process
155  * owning the socket lock releases it.
156  *
157  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
158  * the problem of a blocked socket operation preventing any other operations
159  * from occurring.  However, applications must be careful if they have
160  * multiple threads trying to send (or receive) on the same socket, as these
161  * operations might interfere with each other.  For example, doing a connect
162  * and a receive at the same time might allow the receive to consume the
163  * ACK message meant for the connect.  While additional work could be done
164  * to try and overcome this, it doesn't seem to be worthwhile at the present.
165  *
166  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
167  * that another operation that must be performed in a non-blocking manner is
168  * not delayed for very long because the lock has already been taken.
169  *
170  * NOTE: This code assumes that certain fields of a port/socket pair are
171  * constant over its lifetime; such fields can be examined without taking
172  * the socket lock and/or port lock, and do not need to be re-read even
173  * after resuming processing after waiting.  These fields include:
174  *   - socket type
175  *   - pointer to socket sk structure (aka tipc_sock structure)
176  *   - pointer to port structure
177  *   - port reference
178  */
179
180 static u32 tsk_own_node(struct tipc_sock *tsk)
181 {
182         return msg_prevnode(&tsk->phdr);
183 }
184
185 static u32 tsk_peer_node(struct tipc_sock *tsk)
186 {
187         return msg_destnode(&tsk->phdr);
188 }
189
190 static u32 tsk_peer_port(struct tipc_sock *tsk)
191 {
192         return msg_destport(&tsk->phdr);
193 }
194
195 static  bool tsk_unreliable(struct tipc_sock *tsk)
196 {
197         return msg_src_droppable(&tsk->phdr) != 0;
198 }
199
200 static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
201 {
202         msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
203 }
204
205 static bool tsk_unreturnable(struct tipc_sock *tsk)
206 {
207         return msg_dest_droppable(&tsk->phdr) != 0;
208 }
209
210 static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
211 {
212         msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
213 }
214
215 static int tsk_importance(struct tipc_sock *tsk)
216 {
217         return msg_importance(&tsk->phdr);
218 }
219
220 static int tsk_set_importance(struct tipc_sock *tsk, int imp)
221 {
222         if (imp > TIPC_CRITICAL_IMPORTANCE)
223                 return -EINVAL;
224         msg_set_importance(&tsk->phdr, (u32)imp);
225         return 0;
226 }
227
228 static struct tipc_sock *tipc_sk(const struct sock *sk)
229 {
230         return container_of(sk, struct tipc_sock, sk);
231 }
232
233 static int tsk_conn_cong(struct tipc_sock *tsk)
234 {
235         return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
236 }
237
238 /**
239  * tsk_advance_rx_queue - discard first buffer in socket receive queue
240  *
241  * Caller must hold socket lock
242  */
243 static void tsk_advance_rx_queue(struct sock *sk)
244 {
245         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
246 }
247
248 /**
249  * tsk_rej_rx_queue - reject all buffers in socket receive queue
250  *
251  * Caller must hold socket lock
252  */
253 static void tsk_rej_rx_queue(struct sock *sk)
254 {
255         struct sk_buff *skb;
256         u32 dnode;
257         u32 own_node = tsk_own_node(tipc_sk(sk));
258
259         while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
260                 if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT))
261                         tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0);
262         }
263 }
264
265 /* tsk_peer_msg - verify if message was sent by connected port's peer
266  *
267  * Handles cases where the node's network address has changed from
268  * the default of <0.0.0> to its configured setting.
269  */
270 static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
271 {
272         struct tipc_net *tn = net_generic(sock_net(&tsk->sk), tipc_net_id);
273         u32 peer_port = tsk_peer_port(tsk);
274         u32 orig_node;
275         u32 peer_node;
276
277         if (unlikely(!tsk->connected))
278                 return false;
279
280         if (unlikely(msg_origport(msg) != peer_port))
281                 return false;
282
283         orig_node = msg_orignode(msg);
284         peer_node = tsk_peer_node(tsk);
285
286         if (likely(orig_node == peer_node))
287                 return true;
288
289         if (!orig_node && (peer_node == tn->own_addr))
290                 return true;
291
292         if (!peer_node && (orig_node == tn->own_addr))
293                 return true;
294
295         return false;
296 }
297
298 /**
299  * tipc_sk_create - create a TIPC socket
300  * @net: network namespace (must be default network)
301  * @sock: pre-allocated socket structure
302  * @protocol: protocol indicator (must be 0)
303  * @kern: caused by kernel or by userspace?
304  *
305  * This routine creates additional data structures used by the TIPC socket,
306  * initializes them, and links them together.
307  *
308  * Returns 0 on success, errno otherwise
309  */
310 static int tipc_sk_create(struct net *net, struct socket *sock,
311                           int protocol, int kern)
312 {
313         struct tipc_net *tn;
314         const struct proto_ops *ops;
315         socket_state state;
316         struct sock *sk;
317         struct tipc_sock *tsk;
318         struct tipc_msg *msg;
319
320         /* Validate arguments */
321         if (unlikely(protocol != 0))
322                 return -EPROTONOSUPPORT;
323
324         switch (sock->type) {
325         case SOCK_STREAM:
326                 ops = &stream_ops;
327                 state = SS_UNCONNECTED;
328                 break;
329         case SOCK_SEQPACKET:
330                 ops = &packet_ops;
331                 state = SS_UNCONNECTED;
332                 break;
333         case SOCK_DGRAM:
334         case SOCK_RDM:
335                 ops = &msg_ops;
336                 state = SS_READY;
337                 break;
338         default:
339                 return -EPROTOTYPE;
340         }
341
342         /* Allocate socket's protocol area */
343         if (!kern)
344                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
345         else
346                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
347
348         if (sk == NULL)
349                 return -ENOMEM;
350
351         tsk = tipc_sk(sk);
352         tsk->max_pkt = MAX_PKT_DEFAULT;
353         INIT_LIST_HEAD(&tsk->publications);
354         msg = &tsk->phdr;
355         tn = net_generic(sock_net(sk), tipc_net_id);
356         tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
357                       NAMED_H_SIZE, 0);
358
359         /* Finish initializing socket data structures */
360         sock->ops = ops;
361         sock->state = state;
362         sock_init_data(sock, sk);
363         if (tipc_sk_insert(tsk)) {
364                 pr_warn("Socket create failed; port numbrer exhausted\n");
365                 return -EINVAL;
366         }
367         msg_set_origport(msg, tsk->portid);
368         setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
369         sk->sk_backlog_rcv = tipc_backlog_rcv;
370         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
371         sk->sk_data_ready = tipc_data_ready;
372         sk->sk_write_space = tipc_write_space;
373         tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
374         tsk->sent_unacked = 0;
375         atomic_set(&tsk->dupl_rcvcnt, 0);
376
377         if (sock->state == SS_READY) {
378                 tsk_set_unreturnable(tsk, true);
379                 if (sock->type == SOCK_DGRAM)
380                         tsk_set_unreliable(tsk, true);
381         }
382         return 0;
383 }
384
385 /**
386  * tipc_sock_create_local - create TIPC socket from inside TIPC module
387  * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
388  *
389  * We cannot use sock_creat_kern here because it bumps module user count.
390  * Since socket owner and creator is the same module we must make sure
391  * that module count remains zero for module local sockets, otherwise
392  * we cannot do rmmod.
393  *
394  * Returns 0 on success, errno otherwise
395  */
396 int tipc_sock_create_local(struct net *net, int type, struct socket **res)
397 {
398         int rc;
399
400         rc = sock_create_lite(AF_TIPC, type, 0, res);
401         if (rc < 0) {
402                 pr_err("Failed to create kernel socket\n");
403                 return rc;
404         }
405         tipc_sk_create(net, *res, 0, 1);
406
407         return 0;
408 }
409
410 /**
411  * tipc_sock_release_local - release socket created by tipc_sock_create_local
412  * @sock: the socket to be released.
413  *
414  * Module reference count is not incremented when such sockets are created,
415  * so we must keep it from being decremented when they are released.
416  */
417 void tipc_sock_release_local(struct socket *sock)
418 {
419         tipc_release(sock);
420         sock->ops = NULL;
421         sock_release(sock);
422 }
423
424 /**
425  * tipc_sock_accept_local - accept a connection on a socket created
426  * with tipc_sock_create_local. Use this function to avoid that
427  * module reference count is inadvertently incremented.
428  *
429  * @sock:    the accepting socket
430  * @newsock: reference to the new socket to be created
431  * @flags:   socket flags
432  */
433
434 int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
435                            int flags)
436 {
437         struct sock *sk = sock->sk;
438         int ret;
439
440         ret = sock_create_lite(sk->sk_family, sk->sk_type,
441                                sk->sk_protocol, newsock);
442         if (ret < 0)
443                 return ret;
444
445         ret = tipc_accept(sock, *newsock, flags);
446         if (ret < 0) {
447                 sock_release(*newsock);
448                 return ret;
449         }
450         (*newsock)->ops = sock->ops;
451         return ret;
452 }
453
454 static void tipc_sk_callback(struct rcu_head *head)
455 {
456         struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
457
458         sock_put(&tsk->sk);
459 }
460
461 /**
462  * tipc_release - destroy a TIPC socket
463  * @sock: socket to destroy
464  *
465  * This routine cleans up any messages that are still queued on the socket.
466  * For DGRAM and RDM socket types, all queued messages are rejected.
467  * For SEQPACKET and STREAM socket types, the first message is rejected
468  * and any others are discarded.  (If the first message on a STREAM socket
469  * is partially-read, it is discarded and the next one is rejected instead.)
470  *
471  * NOTE: Rejected messages are not necessarily returned to the sender!  They
472  * are returned or discarded according to the "destination droppable" setting
473  * specified for the message by the sender.
474  *
475  * Returns 0 on success, errno otherwise
476  */
477 static int tipc_release(struct socket *sock)
478 {
479         struct sock *sk = sock->sk;
480         struct net *net;
481         struct tipc_sock *tsk;
482         struct sk_buff *skb;
483         u32 dnode, probing_state;
484
485         /*
486          * Exit if socket isn't fully initialized (occurs when a failed accept()
487          * releases a pre-allocated child socket that was never used)
488          */
489         if (sk == NULL)
490                 return 0;
491
492         net = sock_net(sk);
493         tsk = tipc_sk(sk);
494         lock_sock(sk);
495
496         /*
497          * Reject all unreceived messages, except on an active connection
498          * (which disconnects locally & sends a 'FIN+' to peer)
499          */
500         dnode = tsk_peer_node(tsk);
501         while (sock->state != SS_DISCONNECTING) {
502                 skb = __skb_dequeue(&sk->sk_receive_queue);
503                 if (skb == NULL)
504                         break;
505                 if (TIPC_SKB_CB(skb)->handle != NULL)
506                         kfree_skb(skb);
507                 else {
508                         if ((sock->state == SS_CONNECTING) ||
509                             (sock->state == SS_CONNECTED)) {
510                                 sock->state = SS_DISCONNECTING;
511                                 tsk->connected = 0;
512                                 tipc_node_remove_conn(net, dnode, tsk->portid);
513                         }
514                         if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
515                                              TIPC_ERR_NO_PORT))
516                                 tipc_link_xmit_skb(net, skb, dnode, 0);
517                 }
518         }
519
520         tipc_sk_withdraw(tsk, 0, NULL);
521         probing_state = tsk->probing_state;
522         if (del_timer_sync(&sk->sk_timer) &&
523             probing_state != TIPC_CONN_PROBING)
524                 sock_put(sk);
525         tipc_sk_remove(tsk);
526         if (tsk->connected) {
527                 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
528                                       TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
529                                       tsk_own_node(tsk), tsk_peer_port(tsk),
530                                       tsk->portid, TIPC_ERR_NO_PORT);
531                 if (skb)
532                         tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
533                 tipc_node_remove_conn(net, dnode, tsk->portid);
534         }
535
536         /* Discard any remaining (connection-based) messages in receive queue */
537         __skb_queue_purge(&sk->sk_receive_queue);
538
539         /* Reject any messages that accumulated in backlog queue */
540         sock->state = SS_DISCONNECTING;
541         release_sock(sk);
542
543         call_rcu(&tsk->rcu, tipc_sk_callback);
544         sock->sk = NULL;
545
546         return 0;
547 }
548
549 /**
550  * tipc_bind - associate or disassocate TIPC name(s) with a socket
551  * @sock: socket structure
552  * @uaddr: socket address describing name(s) and desired operation
553  * @uaddr_len: size of socket address data structure
554  *
555  * Name and name sequence binding is indicated using a positive scope value;
556  * a negative scope value unbinds the specified name.  Specifying no name
557  * (i.e. a socket address length of 0) unbinds all names from the socket.
558  *
559  * Returns 0 on success, errno otherwise
560  *
561  * NOTE: This routine doesn't need to take the socket lock since it doesn't
562  *       access any non-constant socket information.
563  */
564 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
565                      int uaddr_len)
566 {
567         struct sock *sk = sock->sk;
568         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
569         struct tipc_sock *tsk = tipc_sk(sk);
570         int res = -EINVAL;
571
572         lock_sock(sk);
573         if (unlikely(!uaddr_len)) {
574                 res = tipc_sk_withdraw(tsk, 0, NULL);
575                 goto exit;
576         }
577
578         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
579                 res = -EINVAL;
580                 goto exit;
581         }
582         if (addr->family != AF_TIPC) {
583                 res = -EAFNOSUPPORT;
584                 goto exit;
585         }
586
587         if (addr->addrtype == TIPC_ADDR_NAME)
588                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
589         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
590                 res = -EAFNOSUPPORT;
591                 goto exit;
592         }
593
594         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
595             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
596             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
597                 res = -EACCES;
598                 goto exit;
599         }
600
601         res = (addr->scope > 0) ?
602                 tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
603                 tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
604 exit:
605         release_sock(sk);
606         return res;
607 }
608
609 /**
610  * tipc_getname - get port ID of socket or peer socket
611  * @sock: socket structure
612  * @uaddr: area for returned socket address
613  * @uaddr_len: area for returned length of socket address
614  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
615  *
616  * Returns 0 on success, errno otherwise
617  *
618  * NOTE: This routine doesn't need to take the socket lock since it only
619  *       accesses socket information that is unchanging (or which changes in
620  *       a completely predictable manner).
621  */
622 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
623                         int *uaddr_len, int peer)
624 {
625         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
626         struct tipc_sock *tsk = tipc_sk(sock->sk);
627         struct tipc_net *tn = net_generic(sock_net(sock->sk), tipc_net_id);
628
629         memset(addr, 0, sizeof(*addr));
630         if (peer) {
631                 if ((sock->state != SS_CONNECTED) &&
632                         ((peer != 2) || (sock->state != SS_DISCONNECTING)))
633                         return -ENOTCONN;
634                 addr->addr.id.ref = tsk_peer_port(tsk);
635                 addr->addr.id.node = tsk_peer_node(tsk);
636         } else {
637                 addr->addr.id.ref = tsk->portid;
638                 addr->addr.id.node = tn->own_addr;
639         }
640
641         *uaddr_len = sizeof(*addr);
642         addr->addrtype = TIPC_ADDR_ID;
643         addr->family = AF_TIPC;
644         addr->scope = 0;
645         addr->addr.name.domain = 0;
646
647         return 0;
648 }
649
650 /**
651  * tipc_poll - read and possibly block on pollmask
652  * @file: file structure associated with the socket
653  * @sock: socket for which to calculate the poll bits
654  * @wait: ???
655  *
656  * Returns pollmask value
657  *
658  * COMMENTARY:
659  * It appears that the usual socket locking mechanisms are not useful here
660  * since the pollmask info is potentially out-of-date the moment this routine
661  * exits.  TCP and other protocols seem to rely on higher level poll routines
662  * to handle any preventable race conditions, so TIPC will do the same ...
663  *
664  * TIPC sets the returned events as follows:
665  *
666  * socket state         flags set
667  * ------------         ---------
668  * unconnected          no read flags
669  *                      POLLOUT if port is not congested
670  *
671  * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
672  *                      no write flags
673  *
674  * connected            POLLIN/POLLRDNORM if data in rx queue
675  *                      POLLOUT if port is not congested
676  *
677  * disconnecting        POLLIN/POLLRDNORM/POLLHUP
678  *                      no write flags
679  *
680  * listening            POLLIN if SYN in rx queue
681  *                      no write flags
682  *
683  * ready                POLLIN/POLLRDNORM if data in rx queue
684  * [connectionless]     POLLOUT (since port cannot be congested)
685  *
686  * IMPORTANT: The fact that a read or write operation is indicated does NOT
687  * imply that the operation will succeed, merely that it should be performed
688  * and will not block.
689  */
690 static unsigned int tipc_poll(struct file *file, struct socket *sock,
691                               poll_table *wait)
692 {
693         struct sock *sk = sock->sk;
694         struct tipc_sock *tsk = tipc_sk(sk);
695         u32 mask = 0;
696
697         sock_poll_wait(file, sk_sleep(sk), wait);
698
699         switch ((int)sock->state) {
700         case SS_UNCONNECTED:
701                 if (!tsk->link_cong)
702                         mask |= POLLOUT;
703                 break;
704         case SS_READY:
705         case SS_CONNECTED:
706                 if (!tsk->link_cong && !tsk_conn_cong(tsk))
707                         mask |= POLLOUT;
708                 /* fall thru' */
709         case SS_CONNECTING:
710         case SS_LISTENING:
711                 if (!skb_queue_empty(&sk->sk_receive_queue))
712                         mask |= (POLLIN | POLLRDNORM);
713                 break;
714         case SS_DISCONNECTING:
715                 mask = (POLLIN | POLLRDNORM | POLLHUP);
716                 break;
717         }
718
719         return mask;
720 }
721
722 /**
723  * tipc_sendmcast - send multicast message
724  * @sock: socket structure
725  * @seq: destination address
726  * @msg: message to send
727  * @dsz: total length of message data
728  * @timeo: timeout to wait for wakeup
729  *
730  * Called from function tipc_sendmsg(), which has done all sanity checks
731  * Returns the number of bytes sent on success, or errno
732  */
733 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
734                           struct msghdr *msg, size_t dsz, long timeo)
735 {
736         struct sock *sk = sock->sk;
737         struct tipc_sock *tsk = tipc_sk(sk);
738         struct net *net = sock_net(sk);
739         struct tipc_msg *mhdr = &tsk->phdr;
740         struct sk_buff_head head;
741         struct iov_iter save = msg->msg_iter;
742         uint mtu;
743         int rc;
744
745         msg_set_type(mhdr, TIPC_MCAST_MSG);
746         msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
747         msg_set_destport(mhdr, 0);
748         msg_set_destnode(mhdr, 0);
749         msg_set_nametype(mhdr, seq->type);
750         msg_set_namelower(mhdr, seq->lower);
751         msg_set_nameupper(mhdr, seq->upper);
752         msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
753
754 new_mtu:
755         mtu = tipc_bclink_get_mtu();
756         __skb_queue_head_init(&head);
757         rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &head);
758         if (unlikely(rc < 0))
759                 return rc;
760
761         do {
762                 rc = tipc_bclink_xmit(net, &head);
763                 if (likely(rc >= 0)) {
764                         rc = dsz;
765                         break;
766                 }
767                 if (rc == -EMSGSIZE) {
768                         msg->msg_iter = save;
769                         goto new_mtu;
770                 }
771                 if (rc != -ELINKCONG)
772                         break;
773                 tipc_sk(sk)->link_cong = 1;
774                 rc = tipc_wait_for_sndmsg(sock, &timeo);
775                 if (rc)
776                         __skb_queue_purge(&head);
777         } while (!rc);
778         return rc;
779 }
780
781 /* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
782  */
783 void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
784 {
785         struct tipc_msg *msg = buf_msg(buf);
786         struct tipc_port_list dports = {0, NULL, };
787         struct tipc_port_list *item;
788         struct sk_buff *b;
789         uint i, last, dst = 0;
790         u32 scope = TIPC_CLUSTER_SCOPE;
791
792         if (in_own_node(net, msg_orignode(msg)))
793                 scope = TIPC_NODE_SCOPE;
794
795         /* Create destination port list: */
796         tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg),
797                                   msg_nameupper(msg), scope, &dports);
798         last = dports.count;
799         if (!last) {
800                 kfree_skb(buf);
801                 return;
802         }
803
804         for (item = &dports; item; item = item->next) {
805                 for (i = 0; i < PLSIZE && ++dst <= last; i++) {
806                         b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
807                         if (!b) {
808                                 pr_warn("Failed do clone mcast rcv buffer\n");
809                                 continue;
810                         }
811                         msg_set_destport(msg, item->ports[i]);
812                         tipc_sk_rcv(net, b);
813                 }
814         }
815         tipc_port_list_free(&dports);
816 }
817
818 /**
819  * tipc_sk_proto_rcv - receive a connection mng protocol message
820  * @tsk: receiving socket
821  * @skb: pointer to message buffer. Set to NULL if buffer is consumed.
822  */
823 static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb)
824 {
825         struct tipc_msg *msg = buf_msg(*skb);
826         int conn_cong;
827         u32 dnode;
828         u32 own_node = tsk_own_node(tsk);
829         /* Ignore if connection cannot be validated: */
830         if (!tsk_peer_msg(tsk, msg))
831                 goto exit;
832
833         tsk->probing_state = TIPC_CONN_OK;
834
835         if (msg_type(msg) == CONN_ACK) {
836                 conn_cong = tsk_conn_cong(tsk);
837                 tsk->sent_unacked -= msg_msgcnt(msg);
838                 if (conn_cong)
839                         tsk->sk.sk_write_space(&tsk->sk);
840         } else if (msg_type(msg) == CONN_PROBE) {
841                 if (tipc_msg_reverse(own_node, *skb, &dnode, TIPC_OK)) {
842                         msg_set_type(msg, CONN_PROBE_REPLY);
843                         return;
844                 }
845         }
846         /* Do nothing if msg_type() == CONN_PROBE_REPLY */
847 exit:
848         kfree_skb(*skb);
849         *skb = NULL;
850 }
851
852 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
853 {
854         struct sock *sk = sock->sk;
855         struct tipc_sock *tsk = tipc_sk(sk);
856         DEFINE_WAIT(wait);
857         int done;
858
859         do {
860                 int err = sock_error(sk);
861                 if (err)
862                         return err;
863                 if (sock->state == SS_DISCONNECTING)
864                         return -EPIPE;
865                 if (!*timeo_p)
866                         return -EAGAIN;
867                 if (signal_pending(current))
868                         return sock_intr_errno(*timeo_p);
869
870                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
871                 done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
872                 finish_wait(sk_sleep(sk), &wait);
873         } while (!done);
874         return 0;
875 }
876
877 /**
878  * tipc_sendmsg - send message in connectionless manner
879  * @iocb: if NULL, indicates that socket lock is already held
880  * @sock: socket structure
881  * @m: message to send
882  * @dsz: amount of user data to be sent
883  *
884  * Message must have an destination specified explicitly.
885  * Used for SOCK_RDM and SOCK_DGRAM messages,
886  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
887  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
888  *
889  * Returns the number of bytes sent on success, or errno otherwise
890  */
891 static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
892                         struct msghdr *m, size_t dsz)
893 {
894         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
895         struct sock *sk = sock->sk;
896         struct tipc_sock *tsk = tipc_sk(sk);
897         struct net *net = sock_net(sk);
898         struct tipc_msg *mhdr = &tsk->phdr;
899         u32 dnode, dport;
900         struct sk_buff_head head;
901         struct sk_buff *skb;
902         struct tipc_name_seq *seq = &dest->addr.nameseq;
903         struct iov_iter save;
904         u32 mtu;
905         long timeo;
906         int rc;
907
908         if (unlikely(!dest))
909                 return -EDESTADDRREQ;
910
911         if (unlikely((m->msg_namelen < sizeof(*dest)) ||
912                      (dest->family != AF_TIPC)))
913                 return -EINVAL;
914
915         if (dsz > TIPC_MAX_USER_MSG_SIZE)
916                 return -EMSGSIZE;
917
918         if (iocb)
919                 lock_sock(sk);
920
921         if (unlikely(sock->state != SS_READY)) {
922                 if (sock->state == SS_LISTENING) {
923                         rc = -EPIPE;
924                         goto exit;
925                 }
926                 if (sock->state != SS_UNCONNECTED) {
927                         rc = -EISCONN;
928                         goto exit;
929                 }
930                 if (tsk->published) {
931                         rc = -EOPNOTSUPP;
932                         goto exit;
933                 }
934                 if (dest->addrtype == TIPC_ADDR_NAME) {
935                         tsk->conn_type = dest->addr.name.name.type;
936                         tsk->conn_instance = dest->addr.name.name.instance;
937                 }
938         }
939
940         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
941
942         if (dest->addrtype == TIPC_ADDR_MCAST) {
943                 rc = tipc_sendmcast(sock, seq, m, dsz, timeo);
944                 goto exit;
945         } else if (dest->addrtype == TIPC_ADDR_NAME) {
946                 u32 type = dest->addr.name.name.type;
947                 u32 inst = dest->addr.name.name.instance;
948                 u32 domain = dest->addr.name.domain;
949
950                 dnode = domain;
951                 msg_set_type(mhdr, TIPC_NAMED_MSG);
952                 msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
953                 msg_set_nametype(mhdr, type);
954                 msg_set_nameinst(mhdr, inst);
955                 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
956                 dport = tipc_nametbl_translate(net, type, inst, &dnode);
957                 msg_set_destnode(mhdr, dnode);
958                 msg_set_destport(mhdr, dport);
959                 if (unlikely(!dport && !dnode)) {
960                         rc = -EHOSTUNREACH;
961                         goto exit;
962                 }
963         } else if (dest->addrtype == TIPC_ADDR_ID) {
964                 dnode = dest->addr.id.node;
965                 msg_set_type(mhdr, TIPC_DIRECT_MSG);
966                 msg_set_lookup_scope(mhdr, 0);
967                 msg_set_destnode(mhdr, dnode);
968                 msg_set_destport(mhdr, dest->addr.id.ref);
969                 msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
970         }
971
972         save = m->msg_iter;
973 new_mtu:
974         mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
975         __skb_queue_head_init(&head);
976         rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &head);
977         if (rc < 0)
978                 goto exit;
979
980         do {
981                 skb = skb_peek(&head);
982                 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
983                 rc = tipc_link_xmit(net, &head, dnode, tsk->portid);
984                 if (likely(rc >= 0)) {
985                         if (sock->state != SS_READY)
986                                 sock->state = SS_CONNECTING;
987                         rc = dsz;
988                         break;
989                 }
990                 if (rc == -EMSGSIZE) {
991                         m->msg_iter = save;
992                         goto new_mtu;
993                 }
994                 if (rc != -ELINKCONG)
995                         break;
996                 tsk->link_cong = 1;
997                 rc = tipc_wait_for_sndmsg(sock, &timeo);
998                 if (rc)
999                         __skb_queue_purge(&head);
1000         } while (!rc);
1001 exit:
1002         if (iocb)
1003                 release_sock(sk);
1004
1005         return rc;
1006 }
1007
1008 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
1009 {
1010         struct sock *sk = sock->sk;
1011         struct tipc_sock *tsk = tipc_sk(sk);
1012         DEFINE_WAIT(wait);
1013         int done;
1014
1015         do {
1016                 int err = sock_error(sk);
1017                 if (err)
1018                         return err;
1019                 if (sock->state == SS_DISCONNECTING)
1020                         return -EPIPE;
1021                 else if (sock->state != SS_CONNECTED)
1022                         return -ENOTCONN;
1023                 if (!*timeo_p)
1024                         return -EAGAIN;
1025                 if (signal_pending(current))
1026                         return sock_intr_errno(*timeo_p);
1027
1028                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1029                 done = sk_wait_event(sk, timeo_p,
1030                                      (!tsk->link_cong &&
1031                                       !tsk_conn_cong(tsk)) ||
1032                                      !tsk->connected);
1033                 finish_wait(sk_sleep(sk), &wait);
1034         } while (!done);
1035         return 0;
1036 }
1037
1038 /**
1039  * tipc_send_stream - send stream-oriented data
1040  * @iocb: (unused)
1041  * @sock: socket structure
1042  * @m: data to send
1043  * @dsz: total length of data to be transmitted
1044  *
1045  * Used for SOCK_STREAM data.
1046  *
1047  * Returns the number of bytes sent on success (or partial success),
1048  * or errno if no data sent
1049  */
1050 static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
1051                             struct msghdr *m, size_t dsz)
1052 {
1053         struct sock *sk = sock->sk;
1054         struct net *net = sock_net(sk);
1055         struct tipc_sock *tsk = tipc_sk(sk);
1056         struct tipc_msg *mhdr = &tsk->phdr;
1057         struct sk_buff_head head;
1058         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1059         u32 portid = tsk->portid;
1060         int rc = -EINVAL;
1061         long timeo;
1062         u32 dnode;
1063         uint mtu, send, sent = 0;
1064         struct iov_iter save;
1065
1066         /* Handle implied connection establishment */
1067         if (unlikely(dest)) {
1068                 rc = tipc_sendmsg(iocb, sock, m, dsz);
1069                 if (dsz && (dsz == rc))
1070                         tsk->sent_unacked = 1;
1071                 return rc;
1072         }
1073         if (dsz > (uint)INT_MAX)
1074                 return -EMSGSIZE;
1075
1076         if (iocb)
1077                 lock_sock(sk);
1078
1079         if (unlikely(sock->state != SS_CONNECTED)) {
1080                 if (sock->state == SS_DISCONNECTING)
1081                         rc = -EPIPE;
1082                 else
1083                         rc = -ENOTCONN;
1084                 goto exit;
1085         }
1086
1087         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1088         dnode = tsk_peer_node(tsk);
1089
1090 next:
1091         save = m->msg_iter;
1092         mtu = tsk->max_pkt;
1093         send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1094         __skb_queue_head_init(&head);
1095         rc = tipc_msg_build(mhdr, m, sent, send, mtu, &head);
1096         if (unlikely(rc < 0))
1097                 goto exit;
1098         do {
1099                 if (likely(!tsk_conn_cong(tsk))) {
1100                         rc = tipc_link_xmit(net, &head, dnode, portid);
1101                         if (likely(!rc)) {
1102                                 tsk->sent_unacked++;
1103                                 sent += send;
1104                                 if (sent == dsz)
1105                                         break;
1106                                 goto next;
1107                         }
1108                         if (rc == -EMSGSIZE) {
1109                                 tsk->max_pkt = tipc_node_get_mtu(net, dnode,
1110                                                                  portid);
1111                                 m->msg_iter = save;
1112                                 goto next;
1113                         }
1114                         if (rc != -ELINKCONG)
1115                                 break;
1116                         tsk->link_cong = 1;
1117                 }
1118                 rc = tipc_wait_for_sndpkt(sock, &timeo);
1119                 if (rc)
1120                         __skb_queue_purge(&head);
1121         } while (!rc);
1122 exit:
1123         if (iocb)
1124                 release_sock(sk);
1125         return sent ? sent : rc;
1126 }
1127
1128 /**
1129  * tipc_send_packet - send a connection-oriented message
1130  * @iocb: if NULL, indicates that socket lock is already held
1131  * @sock: socket structure
1132  * @m: message to send
1133  * @dsz: length of data to be transmitted
1134  *
1135  * Used for SOCK_SEQPACKET messages.
1136  *
1137  * Returns the number of bytes sent on success, or errno otherwise
1138  */
1139 static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
1140                             struct msghdr *m, size_t dsz)
1141 {
1142         if (dsz > TIPC_MAX_USER_MSG_SIZE)
1143                 return -EMSGSIZE;
1144
1145         return tipc_send_stream(iocb, sock, m, dsz);
1146 }
1147
1148 /* tipc_sk_finish_conn - complete the setup of a connection
1149  */
1150 static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1151                                 u32 peer_node)
1152 {
1153         struct sock *sk = &tsk->sk;
1154         struct net *net = sock_net(sk);
1155         struct tipc_msg *msg = &tsk->phdr;
1156
1157         msg_set_destnode(msg, peer_node);
1158         msg_set_destport(msg, peer_port);
1159         msg_set_type(msg, TIPC_CONN_MSG);
1160         msg_set_lookup_scope(msg, 0);
1161         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1162
1163         tsk->probing_intv = CONN_PROBING_INTERVAL;
1164         tsk->probing_state = TIPC_CONN_OK;
1165         tsk->connected = 1;
1166         sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
1167         tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1168         tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
1169 }
1170
1171 /**
1172  * set_orig_addr - capture sender's address for received message
1173  * @m: descriptor for message info
1174  * @msg: received message header
1175  *
1176  * Note: Address is not captured if not requested by receiver.
1177  */
1178 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
1179 {
1180         DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
1181
1182         if (addr) {
1183                 addr->family = AF_TIPC;
1184                 addr->addrtype = TIPC_ADDR_ID;
1185                 memset(&addr->addr, 0, sizeof(addr->addr));
1186                 addr->addr.id.ref = msg_origport(msg);
1187                 addr->addr.id.node = msg_orignode(msg);
1188                 addr->addr.name.domain = 0;     /* could leave uninitialized */
1189                 addr->scope = 0;                /* could leave uninitialized */
1190                 m->msg_namelen = sizeof(struct sockaddr_tipc);
1191         }
1192 }
1193
1194 /**
1195  * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1196  * @m: descriptor for message info
1197  * @msg: received message header
1198  * @tsk: TIPC port associated with message
1199  *
1200  * Note: Ancillary data is not captured if not requested by receiver.
1201  *
1202  * Returns 0 if successful, otherwise errno
1203  */
1204 static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1205                                  struct tipc_sock *tsk)
1206 {
1207         u32 anc_data[3];
1208         u32 err;
1209         u32 dest_type;
1210         int has_name;
1211         int res;
1212
1213         if (likely(m->msg_controllen == 0))
1214                 return 0;
1215
1216         /* Optionally capture errored message object(s) */
1217         err = msg ? msg_errcode(msg) : 0;
1218         if (unlikely(err)) {
1219                 anc_data[0] = err;
1220                 anc_data[1] = msg_data_sz(msg);
1221                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
1222                 if (res)
1223                         return res;
1224                 if (anc_data[1]) {
1225                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
1226                                        msg_data(msg));
1227                         if (res)
1228                                 return res;
1229                 }
1230         }
1231
1232         /* Optionally capture message destination object */
1233         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
1234         switch (dest_type) {
1235         case TIPC_NAMED_MSG:
1236                 has_name = 1;
1237                 anc_data[0] = msg_nametype(msg);
1238                 anc_data[1] = msg_namelower(msg);
1239                 anc_data[2] = msg_namelower(msg);
1240                 break;
1241         case TIPC_MCAST_MSG:
1242                 has_name = 1;
1243                 anc_data[0] = msg_nametype(msg);
1244                 anc_data[1] = msg_namelower(msg);
1245                 anc_data[2] = msg_nameupper(msg);
1246                 break;
1247         case TIPC_CONN_MSG:
1248                 has_name = (tsk->conn_type != 0);
1249                 anc_data[0] = tsk->conn_type;
1250                 anc_data[1] = tsk->conn_instance;
1251                 anc_data[2] = tsk->conn_instance;
1252                 break;
1253         default:
1254                 has_name = 0;
1255         }
1256         if (has_name) {
1257                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
1258                 if (res)
1259                         return res;
1260         }
1261
1262         return 0;
1263 }
1264
1265 static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1266 {
1267         struct net *net = sock_net(&tsk->sk);
1268         struct sk_buff *skb = NULL;
1269         struct tipc_msg *msg;
1270         u32 peer_port = tsk_peer_port(tsk);
1271         u32 dnode = tsk_peer_node(tsk);
1272
1273         if (!tsk->connected)
1274                 return;
1275         skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
1276                               dnode, tsk_own_node(tsk), peer_port,
1277                               tsk->portid, TIPC_OK);
1278         if (!skb)
1279                 return;
1280         msg = buf_msg(skb);
1281         msg_set_msgcnt(msg, ack);
1282         tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1283 }
1284
1285 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1286 {
1287         struct sock *sk = sock->sk;
1288         DEFINE_WAIT(wait);
1289         long timeo = *timeop;
1290         int err;
1291
1292         for (;;) {
1293                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1294                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1295                         if (sock->state == SS_DISCONNECTING) {
1296                                 err = -ENOTCONN;
1297                                 break;
1298                         }
1299                         release_sock(sk);
1300                         timeo = schedule_timeout(timeo);
1301                         lock_sock(sk);
1302                 }
1303                 err = 0;
1304                 if (!skb_queue_empty(&sk->sk_receive_queue))
1305                         break;
1306                 err = sock_intr_errno(timeo);
1307                 if (signal_pending(current))
1308                         break;
1309                 err = -EAGAIN;
1310                 if (!timeo)
1311                         break;
1312         }
1313         finish_wait(sk_sleep(sk), &wait);
1314         *timeop = timeo;
1315         return err;
1316 }
1317
1318 /**
1319  * tipc_recvmsg - receive packet-oriented message
1320  * @iocb: (unused)
1321  * @m: descriptor for message info
1322  * @buf_len: total size of user buffer area
1323  * @flags: receive flags
1324  *
1325  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1326  * If the complete message doesn't fit in user area, truncate it.
1327  *
1328  * Returns size of returned message data, errno otherwise
1329  */
1330 static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock,
1331                         struct msghdr *m, size_t buf_len, int flags)
1332 {
1333         struct sock *sk = sock->sk;
1334         struct tipc_sock *tsk = tipc_sk(sk);
1335         struct sk_buff *buf;
1336         struct tipc_msg *msg;
1337         long timeo;
1338         unsigned int sz;
1339         u32 err;
1340         int res;
1341
1342         /* Catch invalid receive requests */
1343         if (unlikely(!buf_len))
1344                 return -EINVAL;
1345
1346         lock_sock(sk);
1347
1348         if (unlikely(sock->state == SS_UNCONNECTED)) {
1349                 res = -ENOTCONN;
1350                 goto exit;
1351         }
1352
1353         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1354 restart:
1355
1356         /* Look for a message in receive queue; wait if necessary */
1357         res = tipc_wait_for_rcvmsg(sock, &timeo);
1358         if (res)
1359                 goto exit;
1360
1361         /* Look at first message in receive queue */
1362         buf = skb_peek(&sk->sk_receive_queue);
1363         msg = buf_msg(buf);
1364         sz = msg_data_sz(msg);
1365         err = msg_errcode(msg);
1366
1367         /* Discard an empty non-errored message & try again */
1368         if ((!sz) && (!err)) {
1369                 tsk_advance_rx_queue(sk);
1370                 goto restart;
1371         }
1372
1373         /* Capture sender's address (optional) */
1374         set_orig_addr(m, msg);
1375
1376         /* Capture ancillary data (optional) */
1377         res = tipc_sk_anc_data_recv(m, msg, tsk);
1378         if (res)
1379                 goto exit;
1380
1381         /* Capture message data (if valid) & compute return value (always) */
1382         if (!err) {
1383                 if (unlikely(buf_len < sz)) {
1384                         sz = buf_len;
1385                         m->msg_flags |= MSG_TRUNC;
1386                 }
1387                 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
1388                 if (res)
1389                         goto exit;
1390                 res = sz;
1391         } else {
1392                 if ((sock->state == SS_READY) ||
1393                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1394                         res = 0;
1395                 else
1396                         res = -ECONNRESET;
1397         }
1398
1399         /* Consume received message (optional) */
1400         if (likely(!(flags & MSG_PEEK))) {
1401                 if ((sock->state != SS_READY) &&
1402                     (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1403                         tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1404                         tsk->rcv_unacked = 0;
1405                 }
1406                 tsk_advance_rx_queue(sk);
1407         }
1408 exit:
1409         release_sock(sk);
1410         return res;
1411 }
1412
1413 /**
1414  * tipc_recv_stream - receive stream-oriented data
1415  * @iocb: (unused)
1416  * @m: descriptor for message info
1417  * @buf_len: total size of user buffer area
1418  * @flags: receive flags
1419  *
1420  * Used for SOCK_STREAM messages only.  If not enough data is available
1421  * will optionally wait for more; never truncates data.
1422  *
1423  * Returns size of returned message data, errno otherwise
1424  */
1425 static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock,
1426                             struct msghdr *m, size_t buf_len, int flags)
1427 {
1428         struct sock *sk = sock->sk;
1429         struct tipc_sock *tsk = tipc_sk(sk);
1430         struct sk_buff *buf;
1431         struct tipc_msg *msg;
1432         long timeo;
1433         unsigned int sz;
1434         int sz_to_copy, target, needed;
1435         int sz_copied = 0;
1436         u32 err;
1437         int res = 0;
1438
1439         /* Catch invalid receive attempts */
1440         if (unlikely(!buf_len))
1441                 return -EINVAL;
1442
1443         lock_sock(sk);
1444
1445         if (unlikely(sock->state == SS_UNCONNECTED)) {
1446                 res = -ENOTCONN;
1447                 goto exit;
1448         }
1449
1450         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1451         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1452
1453 restart:
1454         /* Look for a message in receive queue; wait if necessary */
1455         res = tipc_wait_for_rcvmsg(sock, &timeo);
1456         if (res)
1457                 goto exit;
1458
1459         /* Look at first message in receive queue */
1460         buf = skb_peek(&sk->sk_receive_queue);
1461         msg = buf_msg(buf);
1462         sz = msg_data_sz(msg);
1463         err = msg_errcode(msg);
1464
1465         /* Discard an empty non-errored message & try again */
1466         if ((!sz) && (!err)) {
1467                 tsk_advance_rx_queue(sk);
1468                 goto restart;
1469         }
1470
1471         /* Optionally capture sender's address & ancillary data of first msg */
1472         if (sz_copied == 0) {
1473                 set_orig_addr(m, msg);
1474                 res = tipc_sk_anc_data_recv(m, msg, tsk);
1475                 if (res)
1476                         goto exit;
1477         }
1478
1479         /* Capture message data (if valid) & compute return value (always) */
1480         if (!err) {
1481                 u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1482
1483                 sz -= offset;
1484                 needed = (buf_len - sz_copied);
1485                 sz_to_copy = (sz <= needed) ? sz : needed;
1486
1487                 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
1488                                             m, sz_to_copy);
1489                 if (res)
1490                         goto exit;
1491
1492                 sz_copied += sz_to_copy;
1493
1494                 if (sz_to_copy < sz) {
1495                         if (!(flags & MSG_PEEK))
1496                                 TIPC_SKB_CB(buf)->handle =
1497                                 (void *)(unsigned long)(offset + sz_to_copy);
1498                         goto exit;
1499                 }
1500         } else {
1501                 if (sz_copied != 0)
1502                         goto exit; /* can't add error msg to valid data */
1503
1504                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1505                         res = 0;
1506                 else
1507                         res = -ECONNRESET;
1508         }
1509
1510         /* Consume received message (optional) */
1511         if (likely(!(flags & MSG_PEEK))) {
1512                 if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1513                         tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1514                         tsk->rcv_unacked = 0;
1515                 }
1516                 tsk_advance_rx_queue(sk);
1517         }
1518
1519         /* Loop around if more data is required */
1520         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1521             (!skb_queue_empty(&sk->sk_receive_queue) ||
1522             (sz_copied < target)) &&    /* and more is ready or required */
1523             (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1524             (!err))                     /* and haven't reached a FIN */
1525                 goto restart;
1526
1527 exit:
1528         release_sock(sk);
1529         return sz_copied ? sz_copied : res;
1530 }
1531
1532 /**
1533  * tipc_write_space - wake up thread if port congestion is released
1534  * @sk: socket
1535  */
1536 static void tipc_write_space(struct sock *sk)
1537 {
1538         struct socket_wq *wq;
1539
1540         rcu_read_lock();
1541         wq = rcu_dereference(sk->sk_wq);
1542         if (wq_has_sleeper(wq))
1543                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1544                                                 POLLWRNORM | POLLWRBAND);
1545         rcu_read_unlock();
1546 }
1547
1548 /**
1549  * tipc_data_ready - wake up threads to indicate messages have been received
1550  * @sk: socket
1551  * @len: the length of messages
1552  */
1553 static void tipc_data_ready(struct sock *sk)
1554 {
1555         struct socket_wq *wq;
1556
1557         rcu_read_lock();
1558         wq = rcu_dereference(sk->sk_wq);
1559         if (wq_has_sleeper(wq))
1560                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1561                                                 POLLRDNORM | POLLRDBAND);
1562         rcu_read_unlock();
1563 }
1564
1565 /**
1566  * filter_connect - Handle all incoming messages for a connection-based socket
1567  * @tsk: TIPC socket
1568  * @skb: pointer to message buffer. Set to NULL if buffer is consumed
1569  *
1570  * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
1571  */
1572 static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb)
1573 {
1574         struct sock *sk = &tsk->sk;
1575         struct net *net = sock_net(sk);
1576         struct socket *sock = sk->sk_socket;
1577         struct tipc_msg *msg = buf_msg(*skb);
1578         int retval = -TIPC_ERR_NO_PORT;
1579
1580         if (msg_mcast(msg))
1581                 return retval;
1582
1583         switch ((int)sock->state) {
1584         case SS_CONNECTED:
1585                 /* Accept only connection-based messages sent by peer */
1586                 if (tsk_peer_msg(tsk, msg)) {
1587                         if (unlikely(msg_errcode(msg))) {
1588                                 sock->state = SS_DISCONNECTING;
1589                                 tsk->connected = 0;
1590                                 /* let timer expire on it's own */
1591                                 tipc_node_remove_conn(net, tsk_peer_node(tsk),
1592                                                       tsk->portid);
1593                         }
1594                         retval = TIPC_OK;
1595                 }
1596                 break;
1597         case SS_CONNECTING:
1598                 /* Accept only ACK or NACK message */
1599
1600                 if (unlikely(!msg_connected(msg)))
1601                         break;
1602
1603                 if (unlikely(msg_errcode(msg))) {
1604                         sock->state = SS_DISCONNECTING;
1605                         sk->sk_err = ECONNREFUSED;
1606                         retval = TIPC_OK;
1607                         break;
1608                 }
1609
1610                 if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) {
1611                         sock->state = SS_DISCONNECTING;
1612                         sk->sk_err = EINVAL;
1613                         retval = TIPC_OK;
1614                         break;
1615                 }
1616
1617                 tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg));
1618                 msg_set_importance(&tsk->phdr, msg_importance(msg));
1619                 sock->state = SS_CONNECTED;
1620
1621                 /* If an incoming message is an 'ACK-', it should be
1622                  * discarded here because it doesn't contain useful
1623                  * data. In addition, we should try to wake up
1624                  * connect() routine if sleeping.
1625                  */
1626                 if (msg_data_sz(msg) == 0) {
1627                         kfree_skb(*skb);
1628                         *skb = NULL;
1629                         if (waitqueue_active(sk_sleep(sk)))
1630                                 wake_up_interruptible(sk_sleep(sk));
1631                 }
1632                 retval = TIPC_OK;
1633                 break;
1634         case SS_LISTENING:
1635         case SS_UNCONNECTED:
1636                 /* Accept only SYN message */
1637                 if (!msg_connected(msg) && !(msg_errcode(msg)))
1638                         retval = TIPC_OK;
1639                 break;
1640         case SS_DISCONNECTING:
1641                 break;
1642         default:
1643                 pr_err("Unknown socket state %u\n", sock->state);
1644         }
1645         return retval;
1646 }
1647
1648 /**
1649  * rcvbuf_limit - get proper overload limit of socket receive queue
1650  * @sk: socket
1651  * @buf: message
1652  *
1653  * For all connection oriented messages, irrespective of importance,
1654  * the default overload value (i.e. 67MB) is set as limit.
1655  *
1656  * For all connectionless messages, by default new queue limits are
1657  * as belows:
1658  *
1659  * TIPC_LOW_IMPORTANCE       (4 MB)
1660  * TIPC_MEDIUM_IMPORTANCE    (8 MB)
1661  * TIPC_HIGH_IMPORTANCE      (16 MB)
1662  * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1663  *
1664  * Returns overload limit according to corresponding message importance
1665  */
1666 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1667 {
1668         struct tipc_msg *msg = buf_msg(buf);
1669
1670         if (msg_connected(msg))
1671                 return sysctl_tipc_rmem[2];
1672
1673         return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1674                 msg_importance(msg);
1675 }
1676
1677 /**
1678  * filter_rcv - validate incoming message
1679  * @sk: socket
1680  * @skb: pointer to message. Set to NULL if buffer is consumed.
1681  *
1682  * Enqueues message on receive queue if acceptable; optionally handles
1683  * disconnect indication for a connected socket.
1684  *
1685  * Called with socket lock already taken
1686  *
1687  * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected
1688  */
1689 static int filter_rcv(struct sock *sk, struct sk_buff **skb)
1690 {
1691         struct socket *sock = sk->sk_socket;
1692         struct tipc_sock *tsk = tipc_sk(sk);
1693         struct tipc_msg *msg = buf_msg(*skb);
1694         unsigned int limit = rcvbuf_limit(sk, *skb);
1695         int rc = TIPC_OK;
1696
1697         if (unlikely(msg_user(msg) == CONN_MANAGER)) {
1698                 tipc_sk_proto_rcv(tsk, skb);
1699                 return TIPC_OK;
1700         }
1701
1702         if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
1703                 kfree_skb(*skb);
1704                 tsk->link_cong = 0;
1705                 sk->sk_write_space(sk);
1706                 *skb = NULL;
1707                 return TIPC_OK;
1708         }
1709
1710         /* Reject message if it is wrong sort of message for socket */
1711         if (msg_type(msg) > TIPC_DIRECT_MSG)
1712                 return -TIPC_ERR_NO_PORT;
1713
1714         if (sock->state == SS_READY) {
1715                 if (msg_connected(msg))
1716                         return -TIPC_ERR_NO_PORT;
1717         } else {
1718                 rc = filter_connect(tsk, skb);
1719                 if (rc != TIPC_OK || !*skb)
1720                         return rc;
1721         }
1722
1723         /* Reject message if there isn't room to queue it */
1724         if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit)
1725                 return -TIPC_ERR_OVERLOAD;
1726
1727         /* Enqueue message */
1728         TIPC_SKB_CB(*skb)->handle = NULL;
1729         __skb_queue_tail(&sk->sk_receive_queue, *skb);
1730         skb_set_owner_r(*skb, sk);
1731
1732         sk->sk_data_ready(sk);
1733         *skb = NULL;
1734         return TIPC_OK;
1735 }
1736
1737 /**
1738  * tipc_backlog_rcv - handle incoming message from backlog queue
1739  * @sk: socket
1740  * @skb: message
1741  *
1742  * Caller must hold socket lock, but not port lock.
1743  *
1744  * Returns 0
1745  */
1746 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1747 {
1748         int err;
1749         atomic_t *dcnt;
1750         u32 dnode;
1751         struct tipc_sock *tsk = tipc_sk(sk);
1752         struct net *net = sock_net(sk);
1753         uint truesize = skb->truesize;
1754
1755         err = filter_rcv(sk, &skb);
1756         if (likely(!skb)) {
1757                 dcnt = &tsk->dupl_rcvcnt;
1758                 if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT)
1759                         atomic_add(truesize, dcnt);
1760                 return 0;
1761         }
1762         if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err))
1763                 tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
1764         return 0;
1765 }
1766
1767 /**
1768  * tipc_sk_rcv - handle incoming message
1769  * @skb: buffer containing arriving message
1770  * Consumes buffer
1771  * Returns 0 if success, or errno: -EHOSTUNREACH
1772  */
1773 int tipc_sk_rcv(struct net *net, struct sk_buff *skb)
1774 {
1775         struct tipc_sock *tsk;
1776         struct tipc_net *tn;
1777         struct sock *sk;
1778         u32 dport = msg_destport(buf_msg(skb));
1779         int err = TIPC_OK;
1780         uint limit;
1781         u32 dnode;
1782
1783         /* Validate destination and message */
1784         tsk = tipc_sk_lookup(net, dport);
1785         if (unlikely(!tsk)) {
1786                 err = tipc_msg_eval(net, skb, &dnode);
1787                 goto exit;
1788         }
1789         sk = &tsk->sk;
1790
1791         /* Queue message */
1792         spin_lock_bh(&sk->sk_lock.slock);
1793
1794         if (!sock_owned_by_user(sk)) {
1795                 err = filter_rcv(sk, &skb);
1796         } else {
1797                 if (sk->sk_backlog.len == 0)
1798                         atomic_set(&tsk->dupl_rcvcnt, 0);
1799                 limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt);
1800                 if (likely(!sk_add_backlog(sk, skb, limit)))
1801                         skb = NULL;
1802                 else
1803                         err = -TIPC_ERR_OVERLOAD;
1804         }
1805         spin_unlock_bh(&sk->sk_lock.slock);
1806         sock_put(sk);
1807 exit:
1808         if (unlikely(skb)) {
1809                 tn = net_generic(net, tipc_net_id);
1810                 if (!err || tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
1811                         tipc_link_xmit_skb(net, skb, dnode, 0);
1812         }
1813         return err ? -EHOSTUNREACH : 0;
1814 }
1815
1816 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1817 {
1818         struct sock *sk = sock->sk;
1819         DEFINE_WAIT(wait);
1820         int done;
1821
1822         do {
1823                 int err = sock_error(sk);
1824                 if (err)
1825                         return err;
1826                 if (!*timeo_p)
1827                         return -ETIMEDOUT;
1828                 if (signal_pending(current))
1829                         return sock_intr_errno(*timeo_p);
1830
1831                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1832                 done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1833                 finish_wait(sk_sleep(sk), &wait);
1834         } while (!done);
1835         return 0;
1836 }
1837
1838 /**
1839  * tipc_connect - establish a connection to another TIPC port
1840  * @sock: socket structure
1841  * @dest: socket address for destination port
1842  * @destlen: size of socket address data structure
1843  * @flags: file-related flags associated with socket
1844  *
1845  * Returns 0 on success, errno otherwise
1846  */
1847 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1848                         int destlen, int flags)
1849 {
1850         struct sock *sk = sock->sk;
1851         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1852         struct msghdr m = {NULL,};
1853         long timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1854         socket_state previous;
1855         int res;
1856
1857         lock_sock(sk);
1858
1859         /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1860         if (sock->state == SS_READY) {
1861                 res = -EOPNOTSUPP;
1862                 goto exit;
1863         }
1864
1865         /*
1866          * Reject connection attempt using multicast address
1867          *
1868          * Note: send_msg() validates the rest of the address fields,
1869          *       so there's no need to do it here
1870          */
1871         if (dst->addrtype == TIPC_ADDR_MCAST) {
1872                 res = -EINVAL;
1873                 goto exit;
1874         }
1875
1876         previous = sock->state;
1877         switch (sock->state) {
1878         case SS_UNCONNECTED:
1879                 /* Send a 'SYN-' to destination */
1880                 m.msg_name = dest;
1881                 m.msg_namelen = destlen;
1882
1883                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1884                  * indicate send_msg() is never blocked.
1885                  */
1886                 if (!timeout)
1887                         m.msg_flags = MSG_DONTWAIT;
1888
1889                 res = tipc_sendmsg(NULL, sock, &m, 0);
1890                 if ((res < 0) && (res != -EWOULDBLOCK))
1891                         goto exit;
1892
1893                 /* Just entered SS_CONNECTING state; the only
1894                  * difference is that return value in non-blocking
1895                  * case is EINPROGRESS, rather than EALREADY.
1896                  */
1897                 res = -EINPROGRESS;
1898         case SS_CONNECTING:
1899                 if (previous == SS_CONNECTING)
1900                         res = -EALREADY;
1901                 if (!timeout)
1902                         goto exit;
1903                 timeout = msecs_to_jiffies(timeout);
1904                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1905                 res = tipc_wait_for_connect(sock, &timeout);
1906                 break;
1907         case SS_CONNECTED:
1908                 res = -EISCONN;
1909                 break;
1910         default:
1911                 res = -EINVAL;
1912                 break;
1913         }
1914 exit:
1915         release_sock(sk);
1916         return res;
1917 }
1918
1919 /**
1920  * tipc_listen - allow socket to listen for incoming connections
1921  * @sock: socket structure
1922  * @len: (unused)
1923  *
1924  * Returns 0 on success, errno otherwise
1925  */
1926 static int tipc_listen(struct socket *sock, int len)
1927 {
1928         struct sock *sk = sock->sk;
1929         int res;
1930
1931         lock_sock(sk);
1932
1933         if (sock->state != SS_UNCONNECTED)
1934                 res = -EINVAL;
1935         else {
1936                 sock->state = SS_LISTENING;
1937                 res = 0;
1938         }
1939
1940         release_sock(sk);
1941         return res;
1942 }
1943
1944 static int tipc_wait_for_accept(struct socket *sock, long timeo)
1945 {
1946         struct sock *sk = sock->sk;
1947         DEFINE_WAIT(wait);
1948         int err;
1949
1950         /* True wake-one mechanism for incoming connections: only
1951          * one process gets woken up, not the 'whole herd'.
1952          * Since we do not 'race & poll' for established sockets
1953          * anymore, the common case will execute the loop only once.
1954         */
1955         for (;;) {
1956                 prepare_to_wait_exclusive(sk_sleep(sk), &wait,
1957                                           TASK_INTERRUPTIBLE);
1958                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1959                         release_sock(sk);
1960                         timeo = schedule_timeout(timeo);
1961                         lock_sock(sk);
1962                 }
1963                 err = 0;
1964                 if (!skb_queue_empty(&sk->sk_receive_queue))
1965                         break;
1966                 err = -EINVAL;
1967                 if (sock->state != SS_LISTENING)
1968                         break;
1969                 err = sock_intr_errno(timeo);
1970                 if (signal_pending(current))
1971                         break;
1972                 err = -EAGAIN;
1973                 if (!timeo)
1974                         break;
1975         }
1976         finish_wait(sk_sleep(sk), &wait);
1977         return err;
1978 }
1979
1980 /**
1981  * tipc_accept - wait for connection request
1982  * @sock: listening socket
1983  * @newsock: new socket that is to be connected
1984  * @flags: file-related flags associated with socket
1985  *
1986  * Returns 0 on success, errno otherwise
1987  */
1988 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
1989 {
1990         struct sock *new_sk, *sk = sock->sk;
1991         struct sk_buff *buf;
1992         struct tipc_sock *new_tsock;
1993         struct tipc_msg *msg;
1994         long timeo;
1995         int res;
1996
1997         lock_sock(sk);
1998
1999         if (sock->state != SS_LISTENING) {
2000                 res = -EINVAL;
2001                 goto exit;
2002         }
2003         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
2004         res = tipc_wait_for_accept(sock, timeo);
2005         if (res)
2006                 goto exit;
2007
2008         buf = skb_peek(&sk->sk_receive_queue);
2009
2010         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
2011         if (res)
2012                 goto exit;
2013
2014         new_sk = new_sock->sk;
2015         new_tsock = tipc_sk(new_sk);
2016         msg = buf_msg(buf);
2017
2018         /* we lock on new_sk; but lockdep sees the lock on sk */
2019         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
2020
2021         /*
2022          * Reject any stray messages received by new socket
2023          * before the socket lock was taken (very, very unlikely)
2024          */
2025         tsk_rej_rx_queue(new_sk);
2026
2027         /* Connect new socket to it's peer */
2028         tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2029         new_sock->state = SS_CONNECTED;
2030
2031         tsk_set_importance(new_tsock, msg_importance(msg));
2032         if (msg_named(msg)) {
2033                 new_tsock->conn_type = msg_nametype(msg);
2034                 new_tsock->conn_instance = msg_nameinst(msg);
2035         }
2036
2037         /*
2038          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
2039          * Respond to 'SYN+' by queuing it on new socket.
2040          */
2041         if (!msg_data_sz(msg)) {
2042                 struct msghdr m = {NULL,};
2043
2044                 tsk_advance_rx_queue(sk);
2045                 tipc_send_packet(NULL, new_sock, &m, 0);
2046         } else {
2047                 __skb_dequeue(&sk->sk_receive_queue);
2048                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
2049                 skb_set_owner_r(buf, new_sk);
2050         }
2051         release_sock(new_sk);
2052 exit:
2053         release_sock(sk);
2054         return res;
2055 }
2056
2057 /**
2058  * tipc_shutdown - shutdown socket connection
2059  * @sock: socket structure
2060  * @how: direction to close (must be SHUT_RDWR)
2061  *
2062  * Terminates connection (if necessary), then purges socket's receive queue.
2063  *
2064  * Returns 0 on success, errno otherwise
2065  */
2066 static int tipc_shutdown(struct socket *sock, int how)
2067 {
2068         struct sock *sk = sock->sk;
2069         struct net *net = sock_net(sk);
2070         struct tipc_sock *tsk = tipc_sk(sk);
2071         struct sk_buff *skb;
2072         u32 dnode;
2073         int res;
2074
2075         if (how != SHUT_RDWR)
2076                 return -EINVAL;
2077
2078         lock_sock(sk);
2079
2080         switch (sock->state) {
2081         case SS_CONNECTING:
2082         case SS_CONNECTED:
2083
2084 restart:
2085                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
2086                 skb = __skb_dequeue(&sk->sk_receive_queue);
2087                 if (skb) {
2088                         if (TIPC_SKB_CB(skb)->handle != NULL) {
2089                                 kfree_skb(skb);
2090                                 goto restart;
2091                         }
2092                         if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
2093                                              TIPC_CONN_SHUTDOWN))
2094                                 tipc_link_xmit_skb(net, skb, dnode,
2095                                                    tsk->portid);
2096                         tipc_node_remove_conn(net, dnode, tsk->portid);
2097                 } else {
2098                         dnode = tsk_peer_node(tsk);
2099
2100                         skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2101                                               TIPC_CONN_MSG, SHORT_H_SIZE,
2102                                               0, dnode, tsk_own_node(tsk),
2103                                               tsk_peer_port(tsk),
2104                                               tsk->portid, TIPC_CONN_SHUTDOWN);
2105                         tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
2106                 }
2107                 tsk->connected = 0;
2108                 sock->state = SS_DISCONNECTING;
2109                 tipc_node_remove_conn(net, dnode, tsk->portid);
2110                 /* fall through */
2111
2112         case SS_DISCONNECTING:
2113
2114                 /* Discard any unreceived messages */
2115                 __skb_queue_purge(&sk->sk_receive_queue);
2116
2117                 /* Wake up anyone sleeping in poll */
2118                 sk->sk_state_change(sk);
2119                 res = 0;
2120                 break;
2121
2122         default:
2123                 res = -ENOTCONN;
2124         }
2125
2126         release_sock(sk);
2127         return res;
2128 }
2129
2130 static void tipc_sk_timeout(unsigned long data)
2131 {
2132         struct tipc_sock *tsk = (struct tipc_sock *)data;
2133         struct sock *sk = &tsk->sk;
2134         struct sk_buff *skb = NULL;
2135         u32 peer_port, peer_node;
2136         u32 own_node = tsk_own_node(tsk);
2137
2138         bh_lock_sock(sk);
2139         if (!tsk->connected) {
2140                 bh_unlock_sock(sk);
2141                 goto exit;
2142         }
2143         peer_port = tsk_peer_port(tsk);
2144         peer_node = tsk_peer_node(tsk);
2145
2146         if (tsk->probing_state == TIPC_CONN_PROBING) {
2147                 /* Previous probe not answered -> self abort */
2148                 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2149                                       TIPC_CONN_MSG, SHORT_H_SIZE, 0,
2150                                       own_node, peer_node, tsk->portid,
2151                                       peer_port, TIPC_ERR_NO_PORT);
2152         } else {
2153                 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE,
2154                                       INT_H_SIZE, 0, peer_node, own_node,
2155                                       peer_port, tsk->portid, TIPC_OK);
2156                 tsk->probing_state = TIPC_CONN_PROBING;
2157                 sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
2158         }
2159         bh_unlock_sock(sk);
2160         if (skb)
2161                 tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
2162 exit:
2163         sock_put(sk);
2164 }
2165
2166 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2167                            struct tipc_name_seq const *seq)
2168 {
2169         struct net *net = sock_net(&tsk->sk);
2170         struct publication *publ;
2171         u32 key;
2172
2173         if (tsk->connected)
2174                 return -EINVAL;
2175         key = tsk->portid + tsk->pub_count + 1;
2176         if (key == tsk->portid)
2177                 return -EADDRINUSE;
2178
2179         publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
2180                                     scope, tsk->portid, key);
2181         if (unlikely(!publ))
2182                 return -EINVAL;
2183
2184         list_add(&publ->pport_list, &tsk->publications);
2185         tsk->pub_count++;
2186         tsk->published = 1;
2187         return 0;
2188 }
2189
2190 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2191                             struct tipc_name_seq const *seq)
2192 {
2193         struct net *net = sock_net(&tsk->sk);
2194         struct publication *publ;
2195         struct publication *safe;
2196         int rc = -EINVAL;
2197
2198         list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) {
2199                 if (seq) {
2200                         if (publ->scope != scope)
2201                                 continue;
2202                         if (publ->type != seq->type)
2203                                 continue;
2204                         if (publ->lower != seq->lower)
2205                                 continue;
2206                         if (publ->upper != seq->upper)
2207                                 break;
2208                         tipc_nametbl_withdraw(net, publ->type, publ->lower,
2209                                               publ->ref, publ->key);
2210                         rc = 0;
2211                         break;
2212                 }
2213                 tipc_nametbl_withdraw(net, publ->type, publ->lower,
2214                                       publ->ref, publ->key);
2215                 rc = 0;
2216         }
2217         if (list_empty(&tsk->publications))
2218                 tsk->published = 0;
2219         return rc;
2220 }
2221
2222 static int tipc_sk_show(struct tipc_sock *tsk, char *buf,
2223                         int len, int full_id)
2224 {
2225         struct net *net = sock_net(&tsk->sk);
2226         struct tipc_net *tn = net_generic(net, tipc_net_id);
2227         struct publication *publ;
2228         int ret;
2229
2230         if (full_id)
2231                 ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:",
2232                                     tipc_zone(tn->own_addr),
2233                                     tipc_cluster(tn->own_addr),
2234                                     tipc_node(tn->own_addr), tsk->portid);
2235         else
2236                 ret = tipc_snprintf(buf, len, "%-10u:", tsk->portid);
2237
2238         if (tsk->connected) {
2239                 u32 dport = tsk_peer_port(tsk);
2240                 u32 destnode = tsk_peer_node(tsk);
2241
2242                 ret += tipc_snprintf(buf + ret, len - ret,
2243                                      " connected to <%u.%u.%u:%u>",
2244                                      tipc_zone(destnode),
2245                                      tipc_cluster(destnode),
2246                                      tipc_node(destnode), dport);
2247                 if (tsk->conn_type != 0)
2248                         ret += tipc_snprintf(buf + ret, len - ret,
2249                                              " via {%u,%u}", tsk->conn_type,
2250                                              tsk->conn_instance);
2251         } else if (tsk->published) {
2252                 ret += tipc_snprintf(buf + ret, len - ret, " bound to");
2253                 list_for_each_entry(publ, &tsk->publications, pport_list) {
2254                         if (publ->lower == publ->upper)
2255                                 ret += tipc_snprintf(buf + ret, len - ret,
2256                                                      " {%u,%u}", publ->type,
2257                                                      publ->lower);
2258                         else
2259                                 ret += tipc_snprintf(buf + ret, len - ret,
2260                                                      " {%u,%u,%u}", publ->type,
2261                                                      publ->lower, publ->upper);
2262                 }
2263         }
2264         ret += tipc_snprintf(buf + ret, len - ret, "\n");
2265         return ret;
2266 }
2267
2268 struct sk_buff *tipc_sk_socks_show(struct net *net)
2269 {
2270         struct tipc_net *tn = net_generic(net, tipc_net_id);
2271         const struct bucket_table *tbl;
2272         struct rhash_head *pos;
2273         struct sk_buff *buf;
2274         struct tlv_desc *rep_tlv;
2275         char *pb;
2276         int pb_len;
2277         struct tipc_sock *tsk;
2278         int str_len = 0;
2279         int i;
2280
2281         buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
2282         if (!buf)
2283                 return NULL;
2284         rep_tlv = (struct tlv_desc *)buf->data;
2285         pb = TLV_DATA(rep_tlv);
2286         pb_len = ULTRA_STRING_MAX_LEN;
2287
2288         rcu_read_lock();
2289         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2290         for (i = 0; i < tbl->size; i++) {
2291                 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2292                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2293                         str_len += tipc_sk_show(tsk, pb + str_len,
2294                                                 pb_len - str_len, 0);
2295                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2296                 }
2297         }
2298         rcu_read_unlock();
2299
2300         str_len += 1;   /* for "\0" */
2301         skb_put(buf, TLV_SPACE(str_len));
2302         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
2303
2304         return buf;
2305 }
2306
2307 /* tipc_sk_reinit: set non-zero address in all existing sockets
2308  *                 when we go from standalone to network mode.
2309  */
2310 void tipc_sk_reinit(struct net *net)
2311 {
2312         struct tipc_net *tn = net_generic(net, tipc_net_id);
2313         const struct bucket_table *tbl;
2314         struct rhash_head *pos;
2315         struct tipc_sock *tsk;
2316         struct tipc_msg *msg;
2317         int i;
2318
2319         rcu_read_lock();
2320         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2321         for (i = 0; i < tbl->size; i++) {
2322                 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2323                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2324                         msg = &tsk->phdr;
2325                         msg_set_prevnode(msg, tn->own_addr);
2326                         msg_set_orignode(msg, tn->own_addr);
2327                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2328                 }
2329         }
2330         rcu_read_unlock();
2331 }
2332
2333 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2334 {
2335         struct tipc_net *tn = net_generic(net, tipc_net_id);
2336         struct tipc_sock *tsk;
2337
2338         rcu_read_lock();
2339         tsk = rhashtable_lookup(&tn->sk_rht, &portid);
2340         if (tsk)
2341                 sock_hold(&tsk->sk);
2342         rcu_read_unlock();
2343
2344         return tsk;
2345 }
2346
2347 static int tipc_sk_insert(struct tipc_sock *tsk)
2348 {
2349         struct sock *sk = &tsk->sk;
2350         struct net *net = sock_net(sk);
2351         struct tipc_net *tn = net_generic(net, tipc_net_id);
2352         u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
2353         u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2354
2355         while (remaining--) {
2356                 portid++;
2357                 if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
2358                         portid = TIPC_MIN_PORT;
2359                 tsk->portid = portid;
2360                 sock_hold(&tsk->sk);
2361                 if (rhashtable_lookup_insert(&tn->sk_rht, &tsk->node))
2362                         return 0;
2363                 sock_put(&tsk->sk);
2364         }
2365
2366         return -1;
2367 }
2368
2369 static void tipc_sk_remove(struct tipc_sock *tsk)
2370 {
2371         struct sock *sk = &tsk->sk;
2372         struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
2373
2374         if (rhashtable_remove(&tn->sk_rht, &tsk->node)) {
2375                 WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
2376                 __sock_put(sk);
2377         }
2378 }
2379
2380 int tipc_sk_rht_init(struct net *net)
2381 {
2382         struct tipc_net *tn = net_generic(net, tipc_net_id);
2383         struct rhashtable_params rht_params = {
2384                 .nelem_hint = 192,
2385                 .head_offset = offsetof(struct tipc_sock, node),
2386                 .key_offset = offsetof(struct tipc_sock, portid),
2387                 .key_len = sizeof(u32), /* portid */
2388                 .hashfn = jhash,
2389                 .max_shift = 20, /* 1M */
2390                 .min_shift = 8,  /* 256 */
2391                 .grow_decision = rht_grow_above_75,
2392                 .shrink_decision = rht_shrink_below_30,
2393         };
2394
2395         return rhashtable_init(&tn->sk_rht, &rht_params);
2396 }
2397
2398 void tipc_sk_rht_destroy(struct net *net)
2399 {
2400         struct tipc_net *tn = net_generic(net, tipc_net_id);
2401
2402         /* Wait for socket readers to complete */
2403         synchronize_net();
2404
2405         rhashtable_destroy(&tn->sk_rht);
2406 }
2407
2408 /**
2409  * tipc_setsockopt - set socket option
2410  * @sock: socket structure
2411  * @lvl: option level
2412  * @opt: option identifier
2413  * @ov: pointer to new option value
2414  * @ol: length of option value
2415  *
2416  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
2417  * (to ease compatibility).
2418  *
2419  * Returns 0 on success, errno otherwise
2420  */
2421 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2422                            char __user *ov, unsigned int ol)
2423 {
2424         struct sock *sk = sock->sk;
2425         struct tipc_sock *tsk = tipc_sk(sk);
2426         u32 value;
2427         int res;
2428
2429         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2430                 return 0;
2431         if (lvl != SOL_TIPC)
2432                 return -ENOPROTOOPT;
2433         if (ol < sizeof(value))
2434                 return -EINVAL;
2435         res = get_user(value, (u32 __user *)ov);
2436         if (res)
2437                 return res;
2438
2439         lock_sock(sk);
2440
2441         switch (opt) {
2442         case TIPC_IMPORTANCE:
2443                 res = tsk_set_importance(tsk, value);
2444                 break;
2445         case TIPC_SRC_DROPPABLE:
2446                 if (sock->type != SOCK_STREAM)
2447                         tsk_set_unreliable(tsk, value);
2448                 else
2449                         res = -ENOPROTOOPT;
2450                 break;
2451         case TIPC_DEST_DROPPABLE:
2452                 tsk_set_unreturnable(tsk, value);
2453                 break;
2454         case TIPC_CONN_TIMEOUT:
2455                 tipc_sk(sk)->conn_timeout = value;
2456                 /* no need to set "res", since already 0 at this point */
2457                 break;
2458         default:
2459                 res = -EINVAL;
2460         }
2461
2462         release_sock(sk);
2463
2464         return res;
2465 }
2466
2467 /**
2468  * tipc_getsockopt - get socket option
2469  * @sock: socket structure
2470  * @lvl: option level
2471  * @opt: option identifier
2472  * @ov: receptacle for option value
2473  * @ol: receptacle for length of option value
2474  *
2475  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
2476  * (to ease compatibility).
2477  *
2478  * Returns 0 on success, errno otherwise
2479  */
2480 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2481                            char __user *ov, int __user *ol)
2482 {
2483         struct sock *sk = sock->sk;
2484         struct tipc_sock *tsk = tipc_sk(sk);
2485         int len;
2486         u32 value;
2487         int res;
2488
2489         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2490                 return put_user(0, ol);
2491         if (lvl != SOL_TIPC)
2492                 return -ENOPROTOOPT;
2493         res = get_user(len, ol);
2494         if (res)
2495                 return res;
2496
2497         lock_sock(sk);
2498
2499         switch (opt) {
2500         case TIPC_IMPORTANCE:
2501                 value = tsk_importance(tsk);
2502                 break;
2503         case TIPC_SRC_DROPPABLE:
2504                 value = tsk_unreliable(tsk);
2505                 break;
2506         case TIPC_DEST_DROPPABLE:
2507                 value = tsk_unreturnable(tsk);
2508                 break;
2509         case TIPC_CONN_TIMEOUT:
2510                 value = tsk->conn_timeout;
2511                 /* no need to set "res", since already 0 at this point */
2512                 break;
2513         case TIPC_NODE_RECVQ_DEPTH:
2514                 value = 0; /* was tipc_queue_size, now obsolete */
2515                 break;
2516         case TIPC_SOCK_RECVQ_DEPTH:
2517                 value = skb_queue_len(&sk->sk_receive_queue);
2518                 break;
2519         default:
2520                 res = -EINVAL;
2521         }
2522
2523         release_sock(sk);
2524
2525         if (res)
2526                 return res;     /* "get" failed */
2527
2528         if (len < sizeof(value))
2529                 return -EINVAL;
2530
2531         if (copy_to_user(ov, &value, sizeof(value)))
2532                 return -EFAULT;
2533
2534         return put_user(sizeof(value), ol);
2535 }
2536
2537 static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
2538 {
2539         struct sock *sk = sock->sk;
2540         struct tipc_sioc_ln_req lnr;
2541         void __user *argp = (void __user *)arg;
2542
2543         switch (cmd) {
2544         case SIOCGETLINKNAME:
2545                 if (copy_from_user(&lnr, argp, sizeof(lnr)))
2546                         return -EFAULT;
2547                 if (!tipc_node_get_linkname(sock_net(sk),
2548                                             lnr.bearer_id & 0xffff, lnr.peer,
2549                                             lnr.linkname, TIPC_MAX_LINK_NAME)) {
2550                         if (copy_to_user(argp, &lnr, sizeof(lnr)))
2551                                 return -EFAULT;
2552                         return 0;
2553                 }
2554                 return -EADDRNOTAVAIL;
2555         default:
2556                 return -ENOIOCTLCMD;
2557         }
2558 }
2559
2560 /* Protocol switches for the various types of TIPC sockets */
2561
2562 static const struct proto_ops msg_ops = {
2563         .owner          = THIS_MODULE,
2564         .family         = AF_TIPC,
2565         .release        = tipc_release,
2566         .bind           = tipc_bind,
2567         .connect        = tipc_connect,
2568         .socketpair     = sock_no_socketpair,
2569         .accept         = sock_no_accept,
2570         .getname        = tipc_getname,
2571         .poll           = tipc_poll,
2572         .ioctl          = tipc_ioctl,
2573         .listen         = sock_no_listen,
2574         .shutdown       = tipc_shutdown,
2575         .setsockopt     = tipc_setsockopt,
2576         .getsockopt     = tipc_getsockopt,
2577         .sendmsg        = tipc_sendmsg,
2578         .recvmsg        = tipc_recvmsg,
2579         .mmap           = sock_no_mmap,
2580         .sendpage       = sock_no_sendpage
2581 };
2582
2583 static const struct proto_ops packet_ops = {
2584         .owner          = THIS_MODULE,
2585         .family         = AF_TIPC,
2586         .release        = tipc_release,
2587         .bind           = tipc_bind,
2588         .connect        = tipc_connect,
2589         .socketpair     = sock_no_socketpair,
2590         .accept         = tipc_accept,
2591         .getname        = tipc_getname,
2592         .poll           = tipc_poll,
2593         .ioctl          = tipc_ioctl,
2594         .listen         = tipc_listen,
2595         .shutdown       = tipc_shutdown,
2596         .setsockopt     = tipc_setsockopt,
2597         .getsockopt     = tipc_getsockopt,
2598         .sendmsg        = tipc_send_packet,
2599         .recvmsg        = tipc_recvmsg,
2600         .mmap           = sock_no_mmap,
2601         .sendpage       = sock_no_sendpage
2602 };
2603
2604 static const struct proto_ops stream_ops = {
2605         .owner          = THIS_MODULE,
2606         .family         = AF_TIPC,
2607         .release        = tipc_release,
2608         .bind           = tipc_bind,
2609         .connect        = tipc_connect,
2610         .socketpair     = sock_no_socketpair,
2611         .accept         = tipc_accept,
2612         .getname        = tipc_getname,
2613         .poll           = tipc_poll,
2614         .ioctl          = tipc_ioctl,
2615         .listen         = tipc_listen,
2616         .shutdown       = tipc_shutdown,
2617         .setsockopt     = tipc_setsockopt,
2618         .getsockopt     = tipc_getsockopt,
2619         .sendmsg        = tipc_send_stream,
2620         .recvmsg        = tipc_recv_stream,
2621         .mmap           = sock_no_mmap,
2622         .sendpage       = sock_no_sendpage
2623 };
2624
2625 static const struct net_proto_family tipc_family_ops = {
2626         .owner          = THIS_MODULE,
2627         .family         = AF_TIPC,
2628         .create         = tipc_sk_create
2629 };
2630
2631 static struct proto tipc_proto = {
2632         .name           = "TIPC",
2633         .owner          = THIS_MODULE,
2634         .obj_size       = sizeof(struct tipc_sock),
2635         .sysctl_rmem    = sysctl_tipc_rmem
2636 };
2637
2638 static struct proto tipc_proto_kern = {
2639         .name           = "TIPC",
2640         .obj_size       = sizeof(struct tipc_sock),
2641         .sysctl_rmem    = sysctl_tipc_rmem
2642 };
2643
2644 /**
2645  * tipc_socket_init - initialize TIPC socket interface
2646  *
2647  * Returns 0 on success, errno otherwise
2648  */
2649 int tipc_socket_init(void)
2650 {
2651         int res;
2652
2653         res = proto_register(&tipc_proto, 1);
2654         if (res) {
2655                 pr_err("Failed to register TIPC protocol type\n");
2656                 goto out;
2657         }
2658
2659         res = sock_register(&tipc_family_ops);
2660         if (res) {
2661                 pr_err("Failed to register TIPC socket type\n");
2662                 proto_unregister(&tipc_proto);
2663                 goto out;
2664         }
2665  out:
2666         return res;
2667 }
2668
2669 /**
2670  * tipc_socket_stop - stop TIPC socket interface
2671  */
2672 void tipc_socket_stop(void)
2673 {
2674         sock_unregister(tipc_family_ops.family);
2675         proto_unregister(&tipc_proto);
2676 }
2677
2678 /* Caller should hold socket lock for the passed tipc socket. */
2679 static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
2680 {
2681         u32 peer_node;
2682         u32 peer_port;
2683         struct nlattr *nest;
2684
2685         peer_node = tsk_peer_node(tsk);
2686         peer_port = tsk_peer_port(tsk);
2687
2688         nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);
2689
2690         if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
2691                 goto msg_full;
2692         if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
2693                 goto msg_full;
2694
2695         if (tsk->conn_type != 0) {
2696                 if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
2697                         goto msg_full;
2698                 if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
2699                         goto msg_full;
2700                 if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
2701                         goto msg_full;
2702         }
2703         nla_nest_end(skb, nest);
2704
2705         return 0;
2706
2707 msg_full:
2708         nla_nest_cancel(skb, nest);
2709
2710         return -EMSGSIZE;
2711 }
2712
2713 /* Caller should hold socket lock for the passed tipc socket. */
2714 static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
2715                             struct tipc_sock *tsk)
2716 {
2717         int err;
2718         void *hdr;
2719         struct nlattr *attrs;
2720         struct net *net = sock_net(skb->sk);
2721         struct tipc_net *tn = net_generic(net, tipc_net_id);
2722
2723         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2724                           &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
2725         if (!hdr)
2726                 goto msg_cancel;
2727
2728         attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
2729         if (!attrs)
2730                 goto genlmsg_cancel;
2731         if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
2732                 goto attr_msg_cancel;
2733         if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tn->own_addr))
2734                 goto attr_msg_cancel;
2735
2736         if (tsk->connected) {
2737                 err = __tipc_nl_add_sk_con(skb, tsk);
2738                 if (err)
2739                         goto attr_msg_cancel;
2740         } else if (!list_empty(&tsk->publications)) {
2741                 if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
2742                         goto attr_msg_cancel;
2743         }
2744         nla_nest_end(skb, attrs);
2745         genlmsg_end(skb, hdr);
2746
2747         return 0;
2748
2749 attr_msg_cancel:
2750         nla_nest_cancel(skb, attrs);
2751 genlmsg_cancel:
2752         genlmsg_cancel(skb, hdr);
2753 msg_cancel:
2754         return -EMSGSIZE;
2755 }
2756
2757 int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
2758 {
2759         int err;
2760         struct tipc_sock *tsk;
2761         const struct bucket_table *tbl;
2762         struct rhash_head *pos;
2763         struct net *net = sock_net(skb->sk);
2764         struct tipc_net *tn = net_generic(net, tipc_net_id);
2765         u32 tbl_id = cb->args[0];
2766         u32 prev_portid = cb->args[1];
2767
2768         rcu_read_lock();
2769         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2770         for (; tbl_id < tbl->size; tbl_id++) {
2771                 rht_for_each_entry_rcu(tsk, pos, tbl, tbl_id, node) {
2772                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2773                         if (prev_portid && prev_portid != tsk->portid) {
2774                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2775                                 continue;
2776                         }
2777
2778                         err = __tipc_nl_add_sk(skb, cb, tsk);
2779                         if (err) {
2780                                 prev_portid = tsk->portid;
2781                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2782                                 goto out;
2783                         }
2784                         prev_portid = 0;
2785                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2786                 }
2787         }
2788 out:
2789         rcu_read_unlock();
2790         cb->args[0] = tbl_id;
2791         cb->args[1] = prev_portid;
2792
2793         return skb->len;
2794 }
2795
2796 /* Caller should hold socket lock for the passed tipc socket. */
2797 static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
2798                                  struct netlink_callback *cb,
2799                                  struct publication *publ)
2800 {
2801         void *hdr;
2802         struct nlattr *attrs;
2803
2804         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2805                           &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
2806         if (!hdr)
2807                 goto msg_cancel;
2808
2809         attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
2810         if (!attrs)
2811                 goto genlmsg_cancel;
2812
2813         if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
2814                 goto attr_msg_cancel;
2815         if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
2816                 goto attr_msg_cancel;
2817         if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
2818                 goto attr_msg_cancel;
2819         if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
2820                 goto attr_msg_cancel;
2821
2822         nla_nest_end(skb, attrs);
2823         genlmsg_end(skb, hdr);
2824
2825         return 0;
2826
2827 attr_msg_cancel:
2828         nla_nest_cancel(skb, attrs);
2829 genlmsg_cancel:
2830         genlmsg_cancel(skb, hdr);
2831 msg_cancel:
2832         return -EMSGSIZE;
2833 }
2834
2835 /* Caller should hold socket lock for the passed tipc socket. */
2836 static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
2837                                   struct netlink_callback *cb,
2838                                   struct tipc_sock *tsk, u32 *last_publ)
2839 {
2840         int err;
2841         struct publication *p;
2842
2843         if (*last_publ) {
2844                 list_for_each_entry(p, &tsk->publications, pport_list) {
2845                         if (p->key == *last_publ)
2846                                 break;
2847                 }
2848                 if (p->key != *last_publ) {
2849                         /* We never set seq or call nl_dump_check_consistent()
2850                          * this means that setting prev_seq here will cause the
2851                          * consistence check to fail in the netlink callback
2852                          * handler. Resulting in the last NLMSG_DONE message
2853                          * having the NLM_F_DUMP_INTR flag set.
2854                          */
2855                         cb->prev_seq = 1;
2856                         *last_publ = 0;
2857                         return -EPIPE;
2858                 }
2859         } else {
2860                 p = list_first_entry(&tsk->publications, struct publication,
2861                                      pport_list);
2862         }
2863
2864         list_for_each_entry_from(p, &tsk->publications, pport_list) {
2865                 err = __tipc_nl_add_sk_publ(skb, cb, p);
2866                 if (err) {
2867                         *last_publ = p->key;
2868                         return err;
2869                 }
2870         }
2871         *last_publ = 0;
2872
2873         return 0;
2874 }
2875
2876 int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2877 {
2878         int err;
2879         u32 tsk_portid = cb->args[0];
2880         u32 last_publ = cb->args[1];
2881         u32 done = cb->args[2];
2882         struct net *net = sock_net(skb->sk);
2883         struct tipc_sock *tsk;
2884
2885         if (!tsk_portid) {
2886                 struct nlattr **attrs;
2887                 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
2888
2889                 err = tipc_nlmsg_parse(cb->nlh, &attrs);
2890                 if (err)
2891                         return err;
2892
2893                 err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
2894                                        attrs[TIPC_NLA_SOCK],
2895                                        tipc_nl_sock_policy);
2896                 if (err)
2897                         return err;
2898
2899                 if (!sock[TIPC_NLA_SOCK_REF])
2900                         return -EINVAL;
2901
2902                 tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
2903         }
2904
2905         if (done)
2906                 return 0;
2907
2908         tsk = tipc_sk_lookup(net, tsk_portid);
2909         if (!tsk)
2910                 return -EINVAL;
2911
2912         lock_sock(&tsk->sk);
2913         err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
2914         if (!err)
2915                 done = 1;
2916         release_sock(&tsk->sk);
2917         sock_put(&tsk->sk);
2918
2919         cb->args[0] = tsk_portid;
2920         cb->args[1] = last_publ;
2921         cb->args[2] = done;
2922
2923         return skb->len;
2924 }