• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1  /*
2   * net/tipc/socket.c: TIPC socket API
3   *
4   * Copyright (c) 2001-2007, 2012-2019, Ericsson AB
5   * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6   * Copyright (c) 2020-2021, Red Hat Inc
7   * All rights reserved.
8   *
9   * Redistribution and use in source and binary forms, with or without
10   * modification, are permitted provided that the following conditions are met:
11   *
12   * 1. Redistributions of source code must retain the above copyright
13   *    notice, this list of conditions and the following disclaimer.
14   * 2. Redistributions in binary form must reproduce the above copyright
15   *    notice, this list of conditions and the following disclaimer in the
16   *    documentation and/or other materials provided with the distribution.
17   * 3. Neither the names of the copyright holders nor the names of its
18   *    contributors may be used to endorse or promote products derived from
19   *    this software without specific prior written permission.
20   *
21   * Alternatively, this software may be distributed under the terms of the
22   * GNU General Public License ("GPL") version 2 as published by the Free
23   * Software Foundation.
24   *
25   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26   * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27   * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29   * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30   * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31   * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32   * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33   * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
35   * POSSIBILITY OF SUCH DAMAGE.
36   */
37  
38  #include <linux/rhashtable.h>
39  #include <linux/sched/signal.h>
40  
41  #include "core.h"
42  #include "name_table.h"
43  #include "node.h"
44  #include "link.h"
45  #include "name_distr.h"
46  #include "socket.h"
47  #include "bcast.h"
48  #include "netlink.h"
49  #include "group.h"
50  #include "trace.h"
51  
52  #define NAGLE_START_INIT	4
53  #define NAGLE_START_MAX		1024
54  #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
55  #define CONN_PROBING_INTV	msecs_to_jiffies(3600000)  /* [ms] => 1 h */
56  #define TIPC_MAX_PORT		0xffffffff
57  #define TIPC_MIN_PORT		1
58  #define TIPC_ACK_RATE		4       /* ACK at 1/4 of rcv window size */
59  
60  enum {
61  	TIPC_LISTEN = TCP_LISTEN,
62  	TIPC_ESTABLISHED = TCP_ESTABLISHED,
63  	TIPC_OPEN = TCP_CLOSE,
64  	TIPC_DISCONNECTING = TCP_CLOSE_WAIT,
65  	TIPC_CONNECTING = TCP_SYN_SENT,
66  };
67  
68  struct sockaddr_pair {
69  	struct sockaddr_tipc sock;
70  	struct sockaddr_tipc member;
71  };
72  
73  /**
74   * struct tipc_sock - TIPC socket structure
75   * @sk: socket - interacts with 'port' and with user via the socket API
76   * @max_pkt: maximum packet size "hint" used when building messages sent by port
77   * @maxnagle: maximum size of msg which can be subject to nagle
78   * @portid: unique port identity in TIPC socket hash table
79   * @phdr: preformatted message header used when sending messages
80   * @cong_links: list of congested links
81   * @publications: list of publications for port
82   * @blocking_link: address of the congested link we are currently sleeping on
83   * @pub_count: total # of publications port has made during its lifetime
84   * @conn_timeout: the time we can wait for an unresponded setup request
85   * @probe_unacked: probe has not received ack yet
86   * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
87   * @cong_link_cnt: number of congested links
88   * @snt_unacked: # messages sent by socket, and not yet acked by peer
89   * @snd_win: send window size
90   * @peer_caps: peer capabilities mask
91   * @rcv_unacked: # messages read by user, but not yet acked back to peer
92   * @rcv_win: receive window size
93   * @peer: 'connected' peer for dgram/rdm
94   * @node: hash table node
95   * @mc_method: cookie for use between socket and broadcast layer
96   * @rcu: rcu struct for tipc_sock
97   * @group: TIPC communications group
98   * @oneway: message count in one direction (FIXME)
99   * @nagle_start: current nagle value
100   * @snd_backlog: send backlog count
101   * @msg_acc: messages accepted; used in managing backlog and nagle
102   * @pkt_cnt: TIPC socket packet count
103   * @expect_ack: whether this TIPC socket is expecting an ack
104   * @nodelay: setsockopt() TIPC_NODELAY setting
105   * @group_is_open: TIPC socket group is fully open (FIXME)
106   * @published: true if port has one or more associated names
107   * @conn_addrtype: address type used when establishing connection
108   */
109  struct tipc_sock {
110  	struct sock sk;
111  	u32 max_pkt;
112  	u32 maxnagle;
113  	u32 portid;
114  	struct tipc_msg phdr;
115  	struct list_head cong_links;
116  	struct list_head publications;
117  	u32 pub_count;
118  	atomic_t dupl_rcvcnt;
119  	u16 conn_timeout;
120  	bool probe_unacked;
121  	u16 cong_link_cnt;
122  	u16 snt_unacked;
123  	u16 snd_win;
124  	u16 peer_caps;
125  	u16 rcv_unacked;
126  	u16 rcv_win;
127  	struct sockaddr_tipc peer;
128  	struct rhash_head node;
129  	struct tipc_mc_method mc_method;
130  	struct rcu_head rcu;
131  	struct tipc_group *group;
132  	u32 oneway;
133  	u32 nagle_start;
134  	u16 snd_backlog;
135  	u16 msg_acc;
136  	u16 pkt_cnt;
137  	bool expect_ack;
138  	bool nodelay;
139  	bool group_is_open;
140  	bool published;
141  	u8 conn_addrtype;
142  };
143  
144  static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb);
145  static void tipc_data_ready(struct sock *sk);
146  static void tipc_write_space(struct sock *sk);
147  static void tipc_sock_destruct(struct sock *sk);
148  static int tipc_release(struct socket *sock);
149  static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags,
150  		       bool kern);
151  static void tipc_sk_timeout(struct timer_list *t);
152  static int tipc_sk_publish(struct tipc_sock *tsk, struct tipc_uaddr *ua);
153  static int tipc_sk_withdraw(struct tipc_sock *tsk, struct tipc_uaddr *ua);
154  static int tipc_sk_leave(struct tipc_sock *tsk);
155  static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
156  static int tipc_sk_insert(struct tipc_sock *tsk);
157  static void tipc_sk_remove(struct tipc_sock *tsk);
158  static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
159  static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
160  static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack);
161  static int tipc_wait_for_connect(struct socket *sock, long *timeo_p);
162  
163  static const struct proto_ops packet_ops;
164  static const struct proto_ops stream_ops;
165  static const struct proto_ops msg_ops;
166  static struct proto tipc_proto;
167  static const struct rhashtable_params tsk_rht_params;
168  
tsk_own_node(struct tipc_sock * tsk)169  static u32 tsk_own_node(struct tipc_sock *tsk)
170  {
171  	return msg_prevnode(&tsk->phdr);
172  }
173  
tsk_peer_node(struct tipc_sock * tsk)174  static u32 tsk_peer_node(struct tipc_sock *tsk)
175  {
176  	return msg_destnode(&tsk->phdr);
177  }
178  
tsk_peer_port(struct tipc_sock * tsk)179  static u32 tsk_peer_port(struct tipc_sock *tsk)
180  {
181  	return msg_destport(&tsk->phdr);
182  }
183  
tsk_unreliable(struct tipc_sock * tsk)184  static  bool tsk_unreliable(struct tipc_sock *tsk)
185  {
186  	return msg_src_droppable(&tsk->phdr) != 0;
187  }
188  
tsk_set_unreliable(struct tipc_sock * tsk,bool unreliable)189  static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
190  {
191  	msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
192  }
193  
tsk_unreturnable(struct tipc_sock * tsk)194  static bool tsk_unreturnable(struct tipc_sock *tsk)
195  {
196  	return msg_dest_droppable(&tsk->phdr) != 0;
197  }
198  
tsk_set_unreturnable(struct tipc_sock * tsk,bool unreturnable)199  static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
200  {
201  	msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
202  }
203  
tsk_importance(struct tipc_sock * tsk)204  static int tsk_importance(struct tipc_sock *tsk)
205  {
206  	return msg_importance(&tsk->phdr);
207  }
208  
tipc_sk(const struct sock * sk)209  static struct tipc_sock *tipc_sk(const struct sock *sk)
210  {
211  	return container_of(sk, struct tipc_sock, sk);
212  }
213  
tsk_set_importance(struct sock * sk,int imp)214  int tsk_set_importance(struct sock *sk, int imp)
215  {
216  	if (imp > TIPC_CRITICAL_IMPORTANCE)
217  		return -EINVAL;
218  	msg_set_importance(&tipc_sk(sk)->phdr, (u32)imp);
219  	return 0;
220  }
221  
tsk_conn_cong(struct tipc_sock * tsk)222  static bool tsk_conn_cong(struct tipc_sock *tsk)
223  {
224  	return tsk->snt_unacked > tsk->snd_win;
225  }
226  
tsk_blocks(int len)227  static u16 tsk_blocks(int len)
228  {
229  	return ((len / FLOWCTL_BLK_SZ) + 1);
230  }
231  
232  /* tsk_blocks(): translate a buffer size in bytes to number of
233   * advertisable blocks, taking into account the ratio truesize(len)/len
234   * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
235   */
tsk_adv_blocks(int len)236  static u16 tsk_adv_blocks(int len)
237  {
238  	return len / FLOWCTL_BLK_SZ / 4;
239  }
240  
241  /* tsk_inc(): increment counter for sent or received data
242   * - If block based flow control is not supported by peer we
243   *   fall back to message based ditto, incrementing the counter
244   */
tsk_inc(struct tipc_sock * tsk,int msglen)245  static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
246  {
247  	if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
248  		return ((msglen / FLOWCTL_BLK_SZ) + 1);
249  	return 1;
250  }
251  
252  /* tsk_set_nagle - enable/disable nagle property by manipulating maxnagle
253   */
tsk_set_nagle(struct tipc_sock * tsk)254  static void tsk_set_nagle(struct tipc_sock *tsk)
255  {
256  	struct sock *sk = &tsk->sk;
257  
258  	tsk->maxnagle = 0;
259  	if (sk->sk_type != SOCK_STREAM)
260  		return;
261  	if (tsk->nodelay)
262  		return;
263  	if (!(tsk->peer_caps & TIPC_NAGLE))
264  		return;
265  	/* Limit node local buffer size to avoid receive queue overflow */
266  	if (tsk->max_pkt == MAX_MSG_SIZE)
267  		tsk->maxnagle = 1500;
268  	else
269  		tsk->maxnagle = tsk->max_pkt;
270  }
271  
272  /**
273   * tsk_advance_rx_queue - discard first buffer in socket receive queue
274   * @sk: network socket
275   *
276   * Caller must hold socket lock
277   */
tsk_advance_rx_queue(struct sock * sk)278  static void tsk_advance_rx_queue(struct sock *sk)
279  {
280  	trace_tipc_sk_advance_rx(sk, NULL, TIPC_DUMP_SK_RCVQ, " ");
281  	kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
282  }
283  
284  /* tipc_sk_respond() : send response message back to sender
285   */
tipc_sk_respond(struct sock * sk,struct sk_buff * skb,int err)286  static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
287  {
288  	u32 selector;
289  	u32 dnode;
290  	u32 onode = tipc_own_addr(sock_net(sk));
291  
292  	if (!tipc_msg_reverse(onode, &skb, err))
293  		return;
294  
295  	trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE, "@sk_respond!");
296  	dnode = msg_destnode(buf_msg(skb));
297  	selector = msg_origport(buf_msg(skb));
298  	tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
299  }
300  
301  /**
302   * tsk_rej_rx_queue - reject all buffers in socket receive queue
303   * @sk: network socket
304   * @error: response error code
305   *
306   * Caller must hold socket lock
307   */
tsk_rej_rx_queue(struct sock * sk,int error)308  static void tsk_rej_rx_queue(struct sock *sk, int error)
309  {
310  	struct sk_buff *skb;
311  
312  	while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
313  		tipc_sk_respond(sk, skb, error);
314  }
315  
tipc_sk_connected(const struct sock * sk)316  static bool tipc_sk_connected(const struct sock *sk)
317  {
318  	return READ_ONCE(sk->sk_state) == TIPC_ESTABLISHED;
319  }
320  
321  /* tipc_sk_type_connectionless - check if the socket is datagram socket
322   * @sk: socket
323   *
324   * Returns true if connection less, false otherwise
325   */
tipc_sk_type_connectionless(struct sock * sk)326  static bool tipc_sk_type_connectionless(struct sock *sk)
327  {
328  	return sk->sk_type == SOCK_RDM || sk->sk_type == SOCK_DGRAM;
329  }
330  
331  /* tsk_peer_msg - verify if message was sent by connected port's peer
332   *
333   * Handles cases where the node's network address has changed from
334   * the default of <0.0.0> to its configured setting.
335   */
tsk_peer_msg(struct tipc_sock * tsk,struct tipc_msg * msg)336  static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
337  {
338  	struct sock *sk = &tsk->sk;
339  	u32 self = tipc_own_addr(sock_net(sk));
340  	u32 peer_port = tsk_peer_port(tsk);
341  	u32 orig_node, peer_node;
342  
343  	if (unlikely(!tipc_sk_connected(sk)))
344  		return false;
345  
346  	if (unlikely(msg_origport(msg) != peer_port))
347  		return false;
348  
349  	orig_node = msg_orignode(msg);
350  	peer_node = tsk_peer_node(tsk);
351  
352  	if (likely(orig_node == peer_node))
353  		return true;
354  
355  	if (!orig_node && peer_node == self)
356  		return true;
357  
358  	if (!peer_node && orig_node == self)
359  		return true;
360  
361  	return false;
362  }
363  
364  /* tipc_set_sk_state - set the sk_state of the socket
365   * @sk: socket
366   *
367   * Caller must hold socket lock
368   *
369   * Returns 0 on success, errno otherwise
370   */
tipc_set_sk_state(struct sock * sk,int state)371  static int tipc_set_sk_state(struct sock *sk, int state)
372  {
373  	int oldsk_state = sk->sk_state;
374  	int res = -EINVAL;
375  
376  	switch (state) {
377  	case TIPC_OPEN:
378  		res = 0;
379  		break;
380  	case TIPC_LISTEN:
381  	case TIPC_CONNECTING:
382  		if (oldsk_state == TIPC_OPEN)
383  			res = 0;
384  		break;
385  	case TIPC_ESTABLISHED:
386  		if (oldsk_state == TIPC_CONNECTING ||
387  		    oldsk_state == TIPC_OPEN)
388  			res = 0;
389  		break;
390  	case TIPC_DISCONNECTING:
391  		if (oldsk_state == TIPC_CONNECTING ||
392  		    oldsk_state == TIPC_ESTABLISHED)
393  			res = 0;
394  		break;
395  	}
396  
397  	if (!res)
398  		sk->sk_state = state;
399  
400  	return res;
401  }
402  
tipc_sk_sock_err(struct socket * sock,long * timeout)403  static int tipc_sk_sock_err(struct socket *sock, long *timeout)
404  {
405  	struct sock *sk = sock->sk;
406  	int err = sock_error(sk);
407  	int typ = sock->type;
408  
409  	if (err)
410  		return err;
411  	if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) {
412  		if (sk->sk_state == TIPC_DISCONNECTING)
413  			return -EPIPE;
414  		else if (!tipc_sk_connected(sk))
415  			return -ENOTCONN;
416  	}
417  	if (!*timeout)
418  		return -EAGAIN;
419  	if (signal_pending(current))
420  		return sock_intr_errno(*timeout);
421  
422  	return 0;
423  }
424  
425  #define tipc_wait_for_cond(sock_, timeo_, condition_)			       \
426  ({                                                                             \
427  	DEFINE_WAIT_FUNC(wait_, woken_wake_function);                          \
428  	struct sock *sk_;						       \
429  	int rc_;							       \
430  									       \
431  	while ((rc_ = !(condition_))) {					       \
432  		/* coupled with smp_wmb() in tipc_sk_proto_rcv() */            \
433  		smp_rmb();                                                     \
434  		sk_ = (sock_)->sk;					       \
435  		rc_ = tipc_sk_sock_err((sock_), timeo_);		       \
436  		if (rc_)						       \
437  			break;						       \
438  		add_wait_queue(sk_sleep(sk_), &wait_);                         \
439  		release_sock(sk_);					       \
440  		*(timeo_) = wait_woken(&wait_, TASK_INTERRUPTIBLE, *(timeo_)); \
441  		sched_annotate_sleep();				               \
442  		lock_sock(sk_);						       \
443  		remove_wait_queue(sk_sleep(sk_), &wait_);		       \
444  	}								       \
445  	rc_;								       \
446  })
447  
448  /**
449   * tipc_sk_create - create a TIPC socket
450   * @net: network namespace (must be default network)
451   * @sock: pre-allocated socket structure
452   * @protocol: protocol indicator (must be 0)
453   * @kern: caused by kernel or by userspace?
454   *
455   * This routine creates additional data structures used by the TIPC socket,
456   * initializes them, and links them together.
457   *
458   * Return: 0 on success, errno otherwise
459   */
tipc_sk_create(struct net * net,struct socket * sock,int protocol,int kern)460  static int tipc_sk_create(struct net *net, struct socket *sock,
461  			  int protocol, int kern)
462  {
463  	const struct proto_ops *ops;
464  	struct sock *sk;
465  	struct tipc_sock *tsk;
466  	struct tipc_msg *msg;
467  
468  	/* Validate arguments */
469  	if (unlikely(protocol != 0))
470  		return -EPROTONOSUPPORT;
471  
472  	switch (sock->type) {
473  	case SOCK_STREAM:
474  		ops = &stream_ops;
475  		break;
476  	case SOCK_SEQPACKET:
477  		ops = &packet_ops;
478  		break;
479  	case SOCK_DGRAM:
480  	case SOCK_RDM:
481  		ops = &msg_ops;
482  		break;
483  	default:
484  		return -EPROTOTYPE;
485  	}
486  
487  	/* Allocate socket's protocol area */
488  	sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto, kern);
489  	if (sk == NULL)
490  		return -ENOMEM;
491  
492  	tsk = tipc_sk(sk);
493  	tsk->max_pkt = MAX_PKT_DEFAULT;
494  	tsk->maxnagle = 0;
495  	tsk->nagle_start = NAGLE_START_INIT;
496  	INIT_LIST_HEAD(&tsk->publications);
497  	INIT_LIST_HEAD(&tsk->cong_links);
498  	msg = &tsk->phdr;
499  
500  	/* Finish initializing socket data structures */
501  	sock->ops = ops;
502  	sock_init_data(sock, sk);
503  	tipc_set_sk_state(sk, TIPC_OPEN);
504  	if (tipc_sk_insert(tsk)) {
505  		sk_free(sk);
506  		pr_warn("Socket create failed; port number exhausted\n");
507  		return -EINVAL;
508  	}
509  
510  	/* Ensure tsk is visible before we read own_addr. */
511  	smp_mb();
512  
513  	tipc_msg_init(tipc_own_addr(net), msg, TIPC_LOW_IMPORTANCE,
514  		      TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
515  
516  	msg_set_origport(msg, tsk->portid);
517  	timer_setup(&sk->sk_timer, tipc_sk_timeout, 0);
518  	sk->sk_shutdown = 0;
519  	sk->sk_backlog_rcv = tipc_sk_backlog_rcv;
520  	sk->sk_rcvbuf = READ_ONCE(sysctl_tipc_rmem[1]);
521  	sk->sk_data_ready = tipc_data_ready;
522  	sk->sk_write_space = tipc_write_space;
523  	sk->sk_destruct = tipc_sock_destruct;
524  	tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
525  	tsk->group_is_open = true;
526  	atomic_set(&tsk->dupl_rcvcnt, 0);
527  
528  	/* Start out with safe limits until we receive an advertised window */
529  	tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
530  	tsk->rcv_win = tsk->snd_win;
531  
532  	if (tipc_sk_type_connectionless(sk)) {
533  		tsk_set_unreturnable(tsk, true);
534  		if (sock->type == SOCK_DGRAM)
535  			tsk_set_unreliable(tsk, true);
536  	}
537  	__skb_queue_head_init(&tsk->mc_method.deferredq);
538  	trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
539  	return 0;
540  }
541  
tipc_sk_callback(struct rcu_head * head)542  static void tipc_sk_callback(struct rcu_head *head)
543  {
544  	struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
545  
546  	sock_put(&tsk->sk);
547  }
548  
549  /* Caller should hold socket lock for the socket. */
__tipc_shutdown(struct socket * sock,int error)550  static void __tipc_shutdown(struct socket *sock, int error)
551  {
552  	struct sock *sk = sock->sk;
553  	struct tipc_sock *tsk = tipc_sk(sk);
554  	struct net *net = sock_net(sk);
555  	long timeout = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT);
556  	u32 dnode = tsk_peer_node(tsk);
557  	struct sk_buff *skb;
558  
559  	/* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
560  	tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
561  					    !tsk_conn_cong(tsk)));
562  
563  	/* Push out delayed messages if in Nagle mode */
564  	tipc_sk_push_backlog(tsk, false);
565  	/* Remove pending SYN */
566  	__skb_queue_purge(&sk->sk_write_queue);
567  
568  	/* Remove partially received buffer if any */
569  	skb = skb_peek(&sk->sk_receive_queue);
570  	if (skb && TIPC_SKB_CB(skb)->bytes_read) {
571  		__skb_unlink(skb, &sk->sk_receive_queue);
572  		kfree_skb(skb);
573  	}
574  
575  	/* Reject all unreceived messages if connectionless */
576  	if (tipc_sk_type_connectionless(sk)) {
577  		tsk_rej_rx_queue(sk, error);
578  		return;
579  	}
580  
581  	switch (sk->sk_state) {
582  	case TIPC_CONNECTING:
583  	case TIPC_ESTABLISHED:
584  		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
585  		tipc_node_remove_conn(net, dnode, tsk->portid);
586  		/* Send a FIN+/- to its peer */
587  		skb = __skb_dequeue(&sk->sk_receive_queue);
588  		if (skb) {
589  			__skb_queue_purge(&sk->sk_receive_queue);
590  			tipc_sk_respond(sk, skb, error);
591  			break;
592  		}
593  		skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
594  				      TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
595  				      tsk_own_node(tsk), tsk_peer_port(tsk),
596  				      tsk->portid, error);
597  		if (skb)
598  			tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
599  		break;
600  	case TIPC_LISTEN:
601  		/* Reject all SYN messages */
602  		tsk_rej_rx_queue(sk, error);
603  		break;
604  	default:
605  		__skb_queue_purge(&sk->sk_receive_queue);
606  		break;
607  	}
608  }
609  
610  /**
611   * tipc_release - destroy a TIPC socket
612   * @sock: socket to destroy
613   *
614   * This routine cleans up any messages that are still queued on the socket.
615   * For DGRAM and RDM socket types, all queued messages are rejected.
616   * For SEQPACKET and STREAM socket types, the first message is rejected
617   * and any others are discarded.  (If the first message on a STREAM socket
618   * is partially-read, it is discarded and the next one is rejected instead.)
619   *
620   * NOTE: Rejected messages are not necessarily returned to the sender!  They
621   * are returned or discarded according to the "destination droppable" setting
622   * specified for the message by the sender.
623   *
624   * Return: 0 on success, errno otherwise
625   */
tipc_release(struct socket * sock)626  static int tipc_release(struct socket *sock)
627  {
628  	struct sock *sk = sock->sk;
629  	struct tipc_sock *tsk;
630  
631  	/*
632  	 * Exit if socket isn't fully initialized (occurs when a failed accept()
633  	 * releases a pre-allocated child socket that was never used)
634  	 */
635  	if (sk == NULL)
636  		return 0;
637  
638  	tsk = tipc_sk(sk);
639  	lock_sock(sk);
640  
641  	trace_tipc_sk_release(sk, NULL, TIPC_DUMP_ALL, " ");
642  	__tipc_shutdown(sock, TIPC_ERR_NO_PORT);
643  	sk->sk_shutdown = SHUTDOWN_MASK;
644  	tipc_sk_leave(tsk);
645  	tipc_sk_withdraw(tsk, NULL);
646  	__skb_queue_purge(&tsk->mc_method.deferredq);
647  	sk_stop_timer(sk, &sk->sk_timer);
648  	tipc_sk_remove(tsk);
649  
650  	sock_orphan(sk);
651  	/* Reject any messages that accumulated in backlog queue */
652  	release_sock(sk);
653  	tipc_dest_list_purge(&tsk->cong_links);
654  	tsk->cong_link_cnt = 0;
655  	call_rcu(&tsk->rcu, tipc_sk_callback);
656  	sock->sk = NULL;
657  
658  	return 0;
659  }
660  
661  /**
662   * __tipc_bind - associate or disassocate TIPC name(s) with a socket
663   * @sock: socket structure
664   * @skaddr: socket address describing name(s) and desired operation
665   * @alen: size of socket address data structure
666   *
667   * Name and name sequence binding are indicated using a positive scope value;
668   * a negative scope value unbinds the specified name.  Specifying no name
669   * (i.e. a socket address length of 0) unbinds all names from the socket.
670   *
671   * Return: 0 on success, errno otherwise
672   *
673   * NOTE: This routine doesn't need to take the socket lock since it doesn't
674   *       access any non-constant socket information.
675   */
__tipc_bind(struct socket * sock,struct sockaddr * skaddr,int alen)676  static int __tipc_bind(struct socket *sock, struct sockaddr *skaddr, int alen)
677  {
678  	struct tipc_uaddr *ua = (struct tipc_uaddr *)skaddr;
679  	struct tipc_sock *tsk = tipc_sk(sock->sk);
680  	bool unbind = false;
681  
682  	if (unlikely(!alen))
683  		return tipc_sk_withdraw(tsk, NULL);
684  
685  	if (ua->addrtype == TIPC_SERVICE_ADDR) {
686  		ua->addrtype = TIPC_SERVICE_RANGE;
687  		ua->sr.upper = ua->sr.lower;
688  	}
689  	if (ua->scope < 0) {
690  		unbind = true;
691  		ua->scope = -ua->scope;
692  	}
693  	/* Users may still use deprecated TIPC_ZONE_SCOPE */
694  	if (ua->scope != TIPC_NODE_SCOPE)
695  		ua->scope = TIPC_CLUSTER_SCOPE;
696  
697  	if (tsk->group)
698  		return -EACCES;
699  
700  	if (unbind)
701  		return tipc_sk_withdraw(tsk, ua);
702  	return tipc_sk_publish(tsk, ua);
703  }
704  
tipc_sk_bind(struct socket * sock,struct sockaddr * skaddr,int alen)705  int tipc_sk_bind(struct socket *sock, struct sockaddr *skaddr, int alen)
706  {
707  	int res;
708  
709  	lock_sock(sock->sk);
710  	res = __tipc_bind(sock, skaddr, alen);
711  	release_sock(sock->sk);
712  	return res;
713  }
714  
tipc_bind(struct socket * sock,struct sockaddr * skaddr,int alen)715  static int tipc_bind(struct socket *sock, struct sockaddr *skaddr, int alen)
716  {
717  	struct tipc_uaddr *ua = (struct tipc_uaddr *)skaddr;
718  	u32 atype = ua->addrtype;
719  
720  	if (alen) {
721  		if (!tipc_uaddr_valid(ua, alen))
722  			return -EINVAL;
723  		if (atype == TIPC_SOCKET_ADDR)
724  			return -EAFNOSUPPORT;
725  		if (ua->sr.type < TIPC_RESERVED_TYPES) {
726  			pr_warn_once("Can't bind to reserved service type %u\n",
727  				     ua->sr.type);
728  			return -EACCES;
729  		}
730  	}
731  	return tipc_sk_bind(sock, skaddr, alen);
732  }
733  
734  /**
735   * tipc_getname - get port ID of socket or peer socket
736   * @sock: socket structure
737   * @uaddr: area for returned socket address
738   * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
739   *
740   * Return: 0 on success, errno otherwise
741   *
742   * NOTE: This routine doesn't need to take the socket lock since it only
743   *       accesses socket information that is unchanging (or which changes in
744   *       a completely predictable manner).
745   */
tipc_getname(struct socket * sock,struct sockaddr * uaddr,int peer)746  static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
747  			int peer)
748  {
749  	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
750  	struct sock *sk = sock->sk;
751  	struct tipc_sock *tsk = tipc_sk(sk);
752  
753  	memset(addr, 0, sizeof(*addr));
754  	if (peer) {
755  		if ((!tipc_sk_connected(sk)) &&
756  		    ((peer != 2) || (sk->sk_state != TIPC_DISCONNECTING)))
757  			return -ENOTCONN;
758  		addr->addr.id.ref = tsk_peer_port(tsk);
759  		addr->addr.id.node = tsk_peer_node(tsk);
760  	} else {
761  		addr->addr.id.ref = tsk->portid;
762  		addr->addr.id.node = tipc_own_addr(sock_net(sk));
763  	}
764  
765  	addr->addrtype = TIPC_SOCKET_ADDR;
766  	addr->family = AF_TIPC;
767  	addr->scope = 0;
768  	addr->addr.name.domain = 0;
769  
770  	return sizeof(*addr);
771  }
772  
773  /**
774   * tipc_poll - read and possibly block on pollmask
775   * @file: file structure associated with the socket
776   * @sock: socket for which to calculate the poll bits
777   * @wait: ???
778   *
779   * Return: pollmask value
780   *
781   * COMMENTARY:
782   * It appears that the usual socket locking mechanisms are not useful here
783   * since the pollmask info is potentially out-of-date the moment this routine
784   * exits.  TCP and other protocols seem to rely on higher level poll routines
785   * to handle any preventable race conditions, so TIPC will do the same ...
786   *
787   * IMPORTANT: The fact that a read or write operation is indicated does NOT
788   * imply that the operation will succeed, merely that it should be performed
789   * and will not block.
790   */
tipc_poll(struct file * file,struct socket * sock,poll_table * wait)791  static __poll_t tipc_poll(struct file *file, struct socket *sock,
792  			      poll_table *wait)
793  {
794  	struct sock *sk = sock->sk;
795  	struct tipc_sock *tsk = tipc_sk(sk);
796  	__poll_t revents = 0;
797  
798  	sock_poll_wait(file, sock, wait);
799  	trace_tipc_sk_poll(sk, NULL, TIPC_DUMP_ALL, " ");
800  
801  	if (sk->sk_shutdown & RCV_SHUTDOWN)
802  		revents |= EPOLLRDHUP | EPOLLIN | EPOLLRDNORM;
803  	if (sk->sk_shutdown == SHUTDOWN_MASK)
804  		revents |= EPOLLHUP;
805  
806  	switch (sk->sk_state) {
807  	case TIPC_ESTABLISHED:
808  		if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
809  			revents |= EPOLLOUT;
810  		fallthrough;
811  	case TIPC_LISTEN:
812  	case TIPC_CONNECTING:
813  		if (!skb_queue_empty_lockless(&sk->sk_receive_queue))
814  			revents |= EPOLLIN | EPOLLRDNORM;
815  		break;
816  	case TIPC_OPEN:
817  		if (tsk->group_is_open && !tsk->cong_link_cnt)
818  			revents |= EPOLLOUT;
819  		if (!tipc_sk_type_connectionless(sk))
820  			break;
821  		if (skb_queue_empty_lockless(&sk->sk_receive_queue))
822  			break;
823  		revents |= EPOLLIN | EPOLLRDNORM;
824  		break;
825  	case TIPC_DISCONNECTING:
826  		revents = EPOLLIN | EPOLLRDNORM | EPOLLHUP;
827  		break;
828  	}
829  	return revents;
830  }
831  
832  /**
833   * tipc_sendmcast - send multicast message
834   * @sock: socket structure
835   * @ua: destination address struct
836   * @msg: message to send
837   * @dlen: length of data to send
838   * @timeout: timeout to wait for wakeup
839   *
840   * Called from function tipc_sendmsg(), which has done all sanity checks
841   * Return: the number of bytes sent on success, or errno
842   */
tipc_sendmcast(struct socket * sock,struct tipc_uaddr * ua,struct msghdr * msg,size_t dlen,long timeout)843  static int tipc_sendmcast(struct  socket *sock, struct tipc_uaddr *ua,
844  			  struct msghdr *msg, size_t dlen, long timeout)
845  {
846  	struct sock *sk = sock->sk;
847  	struct tipc_sock *tsk = tipc_sk(sk);
848  	struct tipc_msg *hdr = &tsk->phdr;
849  	struct net *net = sock_net(sk);
850  	int mtu = tipc_bcast_get_mtu(net);
851  	struct sk_buff_head pkts;
852  	struct tipc_nlist dsts;
853  	int rc;
854  
855  	if (tsk->group)
856  		return -EACCES;
857  
858  	/* Block or return if any destination link is congested */
859  	rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
860  	if (unlikely(rc))
861  		return rc;
862  
863  	/* Lookup destination nodes */
864  	tipc_nlist_init(&dsts, tipc_own_addr(net));
865  	tipc_nametbl_lookup_mcast_nodes(net, ua, &dsts);
866  	if (!dsts.local && !dsts.remote)
867  		return -EHOSTUNREACH;
868  
869  	/* Build message header */
870  	msg_set_type(hdr, TIPC_MCAST_MSG);
871  	msg_set_hdr_sz(hdr, MCAST_H_SIZE);
872  	msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
873  	msg_set_destport(hdr, 0);
874  	msg_set_destnode(hdr, 0);
875  	msg_set_nametype(hdr, ua->sr.type);
876  	msg_set_namelower(hdr, ua->sr.lower);
877  	msg_set_nameupper(hdr, ua->sr.upper);
878  
879  	/* Build message as chain of buffers */
880  	__skb_queue_head_init(&pkts);
881  	rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
882  
883  	/* Send message if build was successful */
884  	if (unlikely(rc == dlen)) {
885  		trace_tipc_sk_sendmcast(sk, skb_peek(&pkts),
886  					TIPC_DUMP_SK_SNDQ, " ");
887  		rc = tipc_mcast_xmit(net, &pkts, &tsk->mc_method, &dsts,
888  				     &tsk->cong_link_cnt);
889  	}
890  
891  	tipc_nlist_purge(&dsts);
892  
893  	return rc ? rc : dlen;
894  }
895  
896  /**
897   * tipc_send_group_msg - send a message to a member in the group
898   * @net: network namespace
899   * @tsk: tipc socket
900   * @m: message to send
901   * @mb: group member
902   * @dnode: destination node
903   * @dport: destination port
904   * @dlen: total length of message data
905   */
tipc_send_group_msg(struct net * net,struct tipc_sock * tsk,struct msghdr * m,struct tipc_member * mb,u32 dnode,u32 dport,int dlen)906  static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
907  			       struct msghdr *m, struct tipc_member *mb,
908  			       u32 dnode, u32 dport, int dlen)
909  {
910  	u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group);
911  	struct tipc_mc_method *method = &tsk->mc_method;
912  	int blks = tsk_blocks(GROUP_H_SIZE + dlen);
913  	struct tipc_msg *hdr = &tsk->phdr;
914  	struct sk_buff_head pkts;
915  	int mtu, rc;
916  
917  	/* Complete message header */
918  	msg_set_type(hdr, TIPC_GRP_UCAST_MSG);
919  	msg_set_hdr_sz(hdr, GROUP_H_SIZE);
920  	msg_set_destport(hdr, dport);
921  	msg_set_destnode(hdr, dnode);
922  	msg_set_grp_bc_seqno(hdr, bc_snd_nxt);
923  
924  	/* Build message as chain of buffers */
925  	__skb_queue_head_init(&pkts);
926  	mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false);
927  	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
928  	if (unlikely(rc != dlen))
929  		return rc;
930  
931  	/* Send message */
932  	rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
933  	if (unlikely(rc == -ELINKCONG)) {
934  		tipc_dest_push(&tsk->cong_links, dnode, 0);
935  		tsk->cong_link_cnt++;
936  	}
937  
938  	/* Update send window */
939  	tipc_group_update_member(mb, blks);
940  
941  	/* A broadcast sent within next EXPIRE period must follow same path */
942  	method->rcast = true;
943  	method->mandatory = true;
944  	return dlen;
945  }
946  
947  /**
948   * tipc_send_group_unicast - send message to a member in the group
949   * @sock: socket structure
950   * @m: message to send
951   * @dlen: total length of message data
952   * @timeout: timeout to wait for wakeup
953   *
954   * Called from function tipc_sendmsg(), which has done all sanity checks
955   * Return: the number of bytes sent on success, or errno
956   */
tipc_send_group_unicast(struct socket * sock,struct msghdr * m,int dlen,long timeout)957  static int tipc_send_group_unicast(struct socket *sock, struct msghdr *m,
958  				   int dlen, long timeout)
959  {
960  	struct sock *sk = sock->sk;
961  	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
962  	int blks = tsk_blocks(GROUP_H_SIZE + dlen);
963  	struct tipc_sock *tsk = tipc_sk(sk);
964  	struct net *net = sock_net(sk);
965  	struct tipc_member *mb = NULL;
966  	u32 node, port;
967  	int rc;
968  
969  	node = ua->sk.node;
970  	port = ua->sk.ref;
971  	if (!port && !node)
972  		return -EHOSTUNREACH;
973  
974  	/* Block or return if destination link or member is congested */
975  	rc = tipc_wait_for_cond(sock, &timeout,
976  				!tipc_dest_find(&tsk->cong_links, node, 0) &&
977  				tsk->group &&
978  				!tipc_group_cong(tsk->group, node, port, blks,
979  						 &mb));
980  	if (unlikely(rc))
981  		return rc;
982  
983  	if (unlikely(!mb))
984  		return -EHOSTUNREACH;
985  
986  	rc = tipc_send_group_msg(net, tsk, m, mb, node, port, dlen);
987  
988  	return rc ? rc : dlen;
989  }
990  
991  /**
992   * tipc_send_group_anycast - send message to any member with given identity
993   * @sock: socket structure
994   * @m: message to send
995   * @dlen: total length of message data
996   * @timeout: timeout to wait for wakeup
997   *
998   * Called from function tipc_sendmsg(), which has done all sanity checks
999   * Return: the number of bytes sent on success, or errno
1000   */
tipc_send_group_anycast(struct socket * sock,struct msghdr * m,int dlen,long timeout)1001  static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
1002  				   int dlen, long timeout)
1003  {
1004  	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
1005  	struct sock *sk = sock->sk;
1006  	struct tipc_sock *tsk = tipc_sk(sk);
1007  	struct list_head *cong_links = &tsk->cong_links;
1008  	int blks = tsk_blocks(GROUP_H_SIZE + dlen);
1009  	struct tipc_msg *hdr = &tsk->phdr;
1010  	struct tipc_member *first = NULL;
1011  	struct tipc_member *mbr = NULL;
1012  	struct net *net = sock_net(sk);
1013  	u32 node, port, exclude;
1014  	struct list_head dsts;
1015  	int lookups = 0;
1016  	int dstcnt, rc;
1017  	bool cong;
1018  
1019  	INIT_LIST_HEAD(&dsts);
1020  	ua->sa.type = msg_nametype(hdr);
1021  	ua->scope = msg_lookup_scope(hdr);
1022  
1023  	while (++lookups < 4) {
1024  		exclude = tipc_group_exclude(tsk->group);
1025  
1026  		first = NULL;
1027  
1028  		/* Look for a non-congested destination member, if any */
1029  		while (1) {
1030  			if (!tipc_nametbl_lookup_group(net, ua, &dsts, &dstcnt,
1031  						       exclude, false))
1032  				return -EHOSTUNREACH;
1033  			tipc_dest_pop(&dsts, &node, &port);
1034  			cong = tipc_group_cong(tsk->group, node, port, blks,
1035  					       &mbr);
1036  			if (!cong)
1037  				break;
1038  			if (mbr == first)
1039  				break;
1040  			if (!first)
1041  				first = mbr;
1042  		}
1043  
1044  		/* Start over if destination was not in member list */
1045  		if (unlikely(!mbr))
1046  			continue;
1047  
1048  		if (likely(!cong && !tipc_dest_find(cong_links, node, 0)))
1049  			break;
1050  
1051  		/* Block or return if destination link or member is congested */
1052  		rc = tipc_wait_for_cond(sock, &timeout,
1053  					!tipc_dest_find(cong_links, node, 0) &&
1054  					tsk->group &&
1055  					!tipc_group_cong(tsk->group, node, port,
1056  							 blks, &mbr));
1057  		if (unlikely(rc))
1058  			return rc;
1059  
1060  		/* Send, unless destination disappeared while waiting */
1061  		if (likely(mbr))
1062  			break;
1063  	}
1064  
1065  	if (unlikely(lookups >= 4))
1066  		return -EHOSTUNREACH;
1067  
1068  	rc = tipc_send_group_msg(net, tsk, m, mbr, node, port, dlen);
1069  
1070  	return rc ? rc : dlen;
1071  }
1072  
1073  /**
1074   * tipc_send_group_bcast - send message to all members in communication group
1075   * @sock: socket structure
1076   * @m: message to send
1077   * @dlen: total length of message data
1078   * @timeout: timeout to wait for wakeup
1079   *
1080   * Called from function tipc_sendmsg(), which has done all sanity checks
1081   * Return: the number of bytes sent on success, or errno
1082   */
tipc_send_group_bcast(struct socket * sock,struct msghdr * m,int dlen,long timeout)1083  static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
1084  				 int dlen, long timeout)
1085  {
1086  	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
1087  	struct sock *sk = sock->sk;
1088  	struct net *net = sock_net(sk);
1089  	struct tipc_sock *tsk = tipc_sk(sk);
1090  	struct tipc_nlist *dsts;
1091  	struct tipc_mc_method *method = &tsk->mc_method;
1092  	bool ack = method->mandatory && method->rcast;
1093  	int blks = tsk_blocks(MCAST_H_SIZE + dlen);
1094  	struct tipc_msg *hdr = &tsk->phdr;
1095  	int mtu = tipc_bcast_get_mtu(net);
1096  	struct sk_buff_head pkts;
1097  	int rc = -EHOSTUNREACH;
1098  
1099  	/* Block or return if any destination link or member is congested */
1100  	rc = tipc_wait_for_cond(sock, &timeout,
1101  				!tsk->cong_link_cnt && tsk->group &&
1102  				!tipc_group_bc_cong(tsk->group, blks));
1103  	if (unlikely(rc))
1104  		return rc;
1105  
1106  	dsts = tipc_group_dests(tsk->group);
1107  	if (!dsts->local && !dsts->remote)
1108  		return -EHOSTUNREACH;
1109  
1110  	/* Complete message header */
1111  	if (ua) {
1112  		msg_set_type(hdr, TIPC_GRP_MCAST_MSG);
1113  		msg_set_nameinst(hdr, ua->sa.instance);
1114  	} else {
1115  		msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
1116  		msg_set_nameinst(hdr, 0);
1117  	}
1118  	msg_set_hdr_sz(hdr, GROUP_H_SIZE);
1119  	msg_set_destport(hdr, 0);
1120  	msg_set_destnode(hdr, 0);
1121  	msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(tsk->group));
1122  
1123  	/* Avoid getting stuck with repeated forced replicasts */
1124  	msg_set_grp_bc_ack_req(hdr, ack);
1125  
1126  	/* Build message as chain of buffers */
1127  	__skb_queue_head_init(&pkts);
1128  	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
1129  	if (unlikely(rc != dlen))
1130  		return rc;
1131  
1132  	/* Send message */
1133  	rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt);
1134  	if (unlikely(rc))
1135  		return rc;
1136  
1137  	/* Update broadcast sequence number and send windows */
1138  	tipc_group_update_bc_members(tsk->group, blks, ack);
1139  
1140  	/* Broadcast link is now free to choose method for next broadcast */
1141  	method->mandatory = false;
1142  	method->expires = jiffies;
1143  
1144  	return dlen;
1145  }
1146  
1147  /**
1148   * tipc_send_group_mcast - send message to all members with given identity
1149   * @sock: socket structure
1150   * @m: message to send
1151   * @dlen: total length of message data
1152   * @timeout: timeout to wait for wakeup
1153   *
1154   * Called from function tipc_sendmsg(), which has done all sanity checks
1155   * Return: the number of bytes sent on success, or errno
1156   */
tipc_send_group_mcast(struct socket * sock,struct msghdr * m,int dlen,long timeout)1157  static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
1158  				 int dlen, long timeout)
1159  {
1160  	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
1161  	struct sock *sk = sock->sk;
1162  	struct tipc_sock *tsk = tipc_sk(sk);
1163  	struct tipc_group *grp = tsk->group;
1164  	struct tipc_msg *hdr = &tsk->phdr;
1165  	struct net *net = sock_net(sk);
1166  	struct list_head dsts;
1167  	u32 dstcnt, exclude;
1168  
1169  	INIT_LIST_HEAD(&dsts);
1170  	ua->sa.type = msg_nametype(hdr);
1171  	ua->scope = msg_lookup_scope(hdr);
1172  	exclude = tipc_group_exclude(grp);
1173  
1174  	if (!tipc_nametbl_lookup_group(net, ua, &dsts, &dstcnt, exclude, true))
1175  		return -EHOSTUNREACH;
1176  
1177  	if (dstcnt == 1) {
1178  		tipc_dest_pop(&dsts, &ua->sk.node, &ua->sk.ref);
1179  		return tipc_send_group_unicast(sock, m, dlen, timeout);
1180  	}
1181  
1182  	tipc_dest_list_purge(&dsts);
1183  	return tipc_send_group_bcast(sock, m, dlen, timeout);
1184  }
1185  
1186  /**
1187   * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
1188   * @net: the associated network namespace
1189   * @arrvq: queue with arriving messages, to be cloned after destination lookup
1190   * @inputq: queue with cloned messages, delivered to socket after dest lookup
1191   *
1192   * Multi-threaded: parallel calls with reference to same queues may occur
1193   */
tipc_sk_mcast_rcv(struct net * net,struct sk_buff_head * arrvq,struct sk_buff_head * inputq)1194  void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
1195  		       struct sk_buff_head *inputq)
1196  {
1197  	u32 self = tipc_own_addr(net);
1198  	struct sk_buff *skb, *_skb;
1199  	u32 portid, onode;
1200  	struct sk_buff_head tmpq;
1201  	struct list_head dports;
1202  	struct tipc_msg *hdr;
1203  	struct tipc_uaddr ua;
1204  	int user, mtyp, hlen;
1205  
1206  	__skb_queue_head_init(&tmpq);
1207  	INIT_LIST_HEAD(&dports);
1208  	ua.addrtype = TIPC_SERVICE_RANGE;
1209  
1210  	/* tipc_skb_peek() increments the head skb's reference counter */
1211  	skb = tipc_skb_peek(arrvq, &inputq->lock);
1212  	for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
1213  		hdr = buf_msg(skb);
1214  		user = msg_user(hdr);
1215  		mtyp = msg_type(hdr);
1216  		hlen = skb_headroom(skb) + msg_hdr_sz(hdr);
1217  		onode = msg_orignode(hdr);
1218  		ua.sr.type = msg_nametype(hdr);
1219  		ua.sr.lower = msg_namelower(hdr);
1220  		ua.sr.upper = msg_nameupper(hdr);
1221  		if (onode == self)
1222  			ua.scope = TIPC_ANY_SCOPE;
1223  		else
1224  			ua.scope = TIPC_CLUSTER_SCOPE;
1225  
1226  		if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
1227  			spin_lock_bh(&inputq->lock);
1228  			if (skb_peek(arrvq) == skb) {
1229  				__skb_dequeue(arrvq);
1230  				__skb_queue_tail(inputq, skb);
1231  			}
1232  			kfree_skb(skb);
1233  			spin_unlock_bh(&inputq->lock);
1234  			continue;
1235  		}
1236  
1237  		/* Group messages require exact scope match */
1238  		if (msg_in_group(hdr)) {
1239  			ua.sr.lower = 0;
1240  			ua.sr.upper = ~0;
1241  			ua.scope = msg_lookup_scope(hdr);
1242  		}
1243  
1244  		/* Create destination port list: */
1245  		tipc_nametbl_lookup_mcast_sockets(net, &ua, &dports);
1246  
1247  		/* Clone message per destination */
1248  		while (tipc_dest_pop(&dports, NULL, &portid)) {
1249  			_skb = __pskb_copy(skb, hlen, GFP_ATOMIC);
1250  			if (_skb) {
1251  				msg_set_destport(buf_msg(_skb), portid);
1252  				__skb_queue_tail(&tmpq, _skb);
1253  				continue;
1254  			}
1255  			pr_warn("Failed to clone mcast rcv buffer\n");
1256  		}
1257  		/* Append clones to inputq only if skb is still head of arrvq */
1258  		spin_lock_bh(&inputq->lock);
1259  		if (skb_peek(arrvq) == skb) {
1260  			skb_queue_splice_tail_init(&tmpq, inputq);
1261  			/* Decrement the skb's refcnt */
1262  			kfree_skb(__skb_dequeue(arrvq));
1263  		}
1264  		spin_unlock_bh(&inputq->lock);
1265  		__skb_queue_purge(&tmpq);
1266  		kfree_skb(skb);
1267  	}
1268  	tipc_sk_rcv(net, inputq);
1269  }
1270  
1271  /* tipc_sk_push_backlog(): send accumulated buffers in socket write queue
1272   *                         when socket is in Nagle mode
1273   */
tipc_sk_push_backlog(struct tipc_sock * tsk,bool nagle_ack)1274  static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack)
1275  {
1276  	struct sk_buff_head *txq = &tsk->sk.sk_write_queue;
1277  	struct sk_buff *skb = skb_peek_tail(txq);
1278  	struct net *net = sock_net(&tsk->sk);
1279  	u32 dnode = tsk_peer_node(tsk);
1280  	int rc;
1281  
1282  	if (nagle_ack) {
1283  		tsk->pkt_cnt += skb_queue_len(txq);
1284  		if (!tsk->pkt_cnt || tsk->msg_acc / tsk->pkt_cnt < 2) {
1285  			tsk->oneway = 0;
1286  			if (tsk->nagle_start < NAGLE_START_MAX)
1287  				tsk->nagle_start *= 2;
1288  			tsk->expect_ack = false;
1289  			pr_debug("tsk %10u: bad nagle %u -> %u, next start %u!\n",
1290  				 tsk->portid, tsk->msg_acc, tsk->pkt_cnt,
1291  				 tsk->nagle_start);
1292  		} else {
1293  			tsk->nagle_start = NAGLE_START_INIT;
1294  			if (skb) {
1295  				msg_set_ack_required(buf_msg(skb));
1296  				tsk->expect_ack = true;
1297  			} else {
1298  				tsk->expect_ack = false;
1299  			}
1300  		}
1301  		tsk->msg_acc = 0;
1302  		tsk->pkt_cnt = 0;
1303  	}
1304  
1305  	if (!skb || tsk->cong_link_cnt)
1306  		return;
1307  
1308  	/* Do not send SYN again after congestion */
1309  	if (msg_is_syn(buf_msg(skb)))
1310  		return;
1311  
1312  	if (tsk->msg_acc)
1313  		tsk->pkt_cnt += skb_queue_len(txq);
1314  	tsk->snt_unacked += tsk->snd_backlog;
1315  	tsk->snd_backlog = 0;
1316  	rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
1317  	if (rc == -ELINKCONG)
1318  		tsk->cong_link_cnt = 1;
1319  }
1320  
1321  /**
1322   * tipc_sk_conn_proto_rcv - receive a connection mng protocol message
1323   * @tsk: receiving socket
1324   * @skb: pointer to message buffer.
1325   * @inputq: buffer list containing the buffers
1326   * @xmitq: output message area
1327   */
tipc_sk_conn_proto_rcv(struct tipc_sock * tsk,struct sk_buff * skb,struct sk_buff_head * inputq,struct sk_buff_head * xmitq)1328  static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
1329  				   struct sk_buff_head *inputq,
1330  				   struct sk_buff_head *xmitq)
1331  {
1332  	struct tipc_msg *hdr = buf_msg(skb);
1333  	u32 onode = tsk_own_node(tsk);
1334  	struct sock *sk = &tsk->sk;
1335  	int mtyp = msg_type(hdr);
1336  	bool was_cong;
1337  
1338  	/* Ignore if connection cannot be validated: */
1339  	if (!tsk_peer_msg(tsk, hdr)) {
1340  		trace_tipc_sk_drop_msg(sk, skb, TIPC_DUMP_NONE, "@proto_rcv!");
1341  		goto exit;
1342  	}
1343  
1344  	if (unlikely(msg_errcode(hdr))) {
1345  		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
1346  		tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
1347  				      tsk_peer_port(tsk));
1348  		sk->sk_state_change(sk);
1349  
1350  		/* State change is ignored if socket already awake,
1351  		 * - convert msg to abort msg and add to inqueue
1352  		 */
1353  		msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
1354  		msg_set_type(hdr, TIPC_CONN_MSG);
1355  		msg_set_size(hdr, BASIC_H_SIZE);
1356  		msg_set_hdr_sz(hdr, BASIC_H_SIZE);
1357  		__skb_queue_tail(inputq, skb);
1358  		return;
1359  	}
1360  
1361  	tsk->probe_unacked = false;
1362  
1363  	if (mtyp == CONN_PROBE) {
1364  		msg_set_type(hdr, CONN_PROBE_REPLY);
1365  		if (tipc_msg_reverse(onode, &skb, TIPC_OK))
1366  			__skb_queue_tail(xmitq, skb);
1367  		return;
1368  	} else if (mtyp == CONN_ACK) {
1369  		was_cong = tsk_conn_cong(tsk);
1370  		tipc_sk_push_backlog(tsk, msg_nagle_ack(hdr));
1371  		tsk->snt_unacked -= msg_conn_ack(hdr);
1372  		if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
1373  			tsk->snd_win = msg_adv_win(hdr);
1374  		if (was_cong && !tsk_conn_cong(tsk))
1375  			sk->sk_write_space(sk);
1376  	} else if (mtyp != CONN_PROBE_REPLY) {
1377  		pr_warn("Received unknown CONN_PROTO msg\n");
1378  	}
1379  exit:
1380  	kfree_skb(skb);
1381  }
1382  
1383  /**
1384   * tipc_sendmsg - send message in connectionless manner
1385   * @sock: socket structure
1386   * @m: message to send
1387   * @dsz: amount of user data to be sent
1388   *
1389   * Message must have an destination specified explicitly.
1390   * Used for SOCK_RDM and SOCK_DGRAM messages,
1391   * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
1392   * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
1393   *
1394   * Return: the number of bytes sent on success, or errno otherwise
1395   */
tipc_sendmsg(struct socket * sock,struct msghdr * m,size_t dsz)1396  static int tipc_sendmsg(struct socket *sock,
1397  			struct msghdr *m, size_t dsz)
1398  {
1399  	struct sock *sk = sock->sk;
1400  	int ret;
1401  
1402  	lock_sock(sk);
1403  	ret = __tipc_sendmsg(sock, m, dsz);
1404  	release_sock(sk);
1405  
1406  	return ret;
1407  }
1408  
__tipc_sendmsg(struct socket * sock,struct msghdr * m,size_t dlen)1409  static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
1410  {
1411  	struct sock *sk = sock->sk;
1412  	struct net *net = sock_net(sk);
1413  	struct tipc_sock *tsk = tipc_sk(sk);
1414  	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
1415  	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1416  	struct list_head *clinks = &tsk->cong_links;
1417  	bool syn = !tipc_sk_type_connectionless(sk);
1418  	struct tipc_group *grp = tsk->group;
1419  	struct tipc_msg *hdr = &tsk->phdr;
1420  	struct tipc_socket_addr skaddr;
1421  	struct sk_buff_head pkts;
1422  	int atype, mtu, rc;
1423  
1424  	if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
1425  		return -EMSGSIZE;
1426  
1427  	if (ua) {
1428  		if (!tipc_uaddr_valid(ua, m->msg_namelen))
1429  			return -EINVAL;
1430  		atype = ua->addrtype;
1431  	}
1432  
1433  	/* If socket belongs to a communication group follow other paths */
1434  	if (grp) {
1435  		if (!ua)
1436  			return tipc_send_group_bcast(sock, m, dlen, timeout);
1437  		if (atype == TIPC_SERVICE_ADDR)
1438  			return tipc_send_group_anycast(sock, m, dlen, timeout);
1439  		if (atype == TIPC_SOCKET_ADDR)
1440  			return tipc_send_group_unicast(sock, m, dlen, timeout);
1441  		if (atype == TIPC_SERVICE_RANGE)
1442  			return tipc_send_group_mcast(sock, m, dlen, timeout);
1443  		return -EINVAL;
1444  	}
1445  
1446  	if (!ua) {
1447  		ua = (struct tipc_uaddr *)&tsk->peer;
1448  		if (!syn && ua->family != AF_TIPC)
1449  			return -EDESTADDRREQ;
1450  		atype = ua->addrtype;
1451  	}
1452  
1453  	if (unlikely(syn)) {
1454  		if (sk->sk_state == TIPC_LISTEN)
1455  			return -EPIPE;
1456  		if (sk->sk_state != TIPC_OPEN)
1457  			return -EISCONN;
1458  		if (tsk->published)
1459  			return -EOPNOTSUPP;
1460  		if (atype == TIPC_SERVICE_ADDR)
1461  			tsk->conn_addrtype = atype;
1462  		msg_set_syn(hdr, 1);
1463  	}
1464  
1465  	memset(&skaddr, 0, sizeof(skaddr));
1466  
1467  	/* Determine destination */
1468  	if (atype == TIPC_SERVICE_RANGE) {
1469  		return tipc_sendmcast(sock, ua, m, dlen, timeout);
1470  	} else if (atype == TIPC_SERVICE_ADDR) {
1471  		skaddr.node = ua->lookup_node;
1472  		ua->scope = tipc_node2scope(skaddr.node);
1473  		if (!tipc_nametbl_lookup_anycast(net, ua, &skaddr))
1474  			return -EHOSTUNREACH;
1475  	} else if (atype == TIPC_SOCKET_ADDR) {
1476  		skaddr = ua->sk;
1477  	} else {
1478  		return -EINVAL;
1479  	}
1480  
1481  	/* Block or return if destination link is congested */
1482  	rc = tipc_wait_for_cond(sock, &timeout,
1483  				!tipc_dest_find(clinks, skaddr.node, 0));
1484  	if (unlikely(rc))
1485  		return rc;
1486  
1487  	/* Finally build message header */
1488  	msg_set_destnode(hdr, skaddr.node);
1489  	msg_set_destport(hdr, skaddr.ref);
1490  	if (atype == TIPC_SERVICE_ADDR) {
1491  		msg_set_type(hdr, TIPC_NAMED_MSG);
1492  		msg_set_hdr_sz(hdr, NAMED_H_SIZE);
1493  		msg_set_nametype(hdr, ua->sa.type);
1494  		msg_set_nameinst(hdr, ua->sa.instance);
1495  		msg_set_lookup_scope(hdr, ua->scope);
1496  	} else { /* TIPC_SOCKET_ADDR */
1497  		msg_set_type(hdr, TIPC_DIRECT_MSG);
1498  		msg_set_lookup_scope(hdr, 0);
1499  		msg_set_hdr_sz(hdr, BASIC_H_SIZE);
1500  	}
1501  
1502  	/* Add message body */
1503  	__skb_queue_head_init(&pkts);
1504  	mtu = tipc_node_get_mtu(net, skaddr.node, tsk->portid, true);
1505  	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
1506  	if (unlikely(rc != dlen))
1507  		return rc;
1508  	if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue))) {
1509  		__skb_queue_purge(&pkts);
1510  		return -ENOMEM;
1511  	}
1512  
1513  	/* Send message */
1514  	trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " ");
1515  	rc = tipc_node_xmit(net, &pkts, skaddr.node, tsk->portid);
1516  	if (unlikely(rc == -ELINKCONG)) {
1517  		tipc_dest_push(clinks, skaddr.node, 0);
1518  		tsk->cong_link_cnt++;
1519  		rc = 0;
1520  	}
1521  
1522  	if (unlikely(syn && !rc)) {
1523  		tipc_set_sk_state(sk, TIPC_CONNECTING);
1524  		if (dlen && timeout) {
1525  			timeout = msecs_to_jiffies(timeout);
1526  			tipc_wait_for_connect(sock, &timeout);
1527  		}
1528  	}
1529  
1530  	return rc ? rc : dlen;
1531  }
1532  
1533  /**
1534   * tipc_sendstream - send stream-oriented data
1535   * @sock: socket structure
1536   * @m: data to send
1537   * @dsz: total length of data to be transmitted
1538   *
1539   * Used for SOCK_STREAM data.
1540   *
1541   * Return: the number of bytes sent on success (or partial success),
1542   * or errno if no data sent
1543   */
tipc_sendstream(struct socket * sock,struct msghdr * m,size_t dsz)1544  static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
1545  {
1546  	struct sock *sk = sock->sk;
1547  	int ret;
1548  
1549  	lock_sock(sk);
1550  	ret = __tipc_sendstream(sock, m, dsz);
1551  	release_sock(sk);
1552  
1553  	return ret;
1554  }
1555  
__tipc_sendstream(struct socket * sock,struct msghdr * m,size_t dlen)1556  static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
1557  {
1558  	struct sock *sk = sock->sk;
1559  	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1560  	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1561  	struct sk_buff_head *txq = &sk->sk_write_queue;
1562  	struct tipc_sock *tsk = tipc_sk(sk);
1563  	struct tipc_msg *hdr = &tsk->phdr;
1564  	struct net *net = sock_net(sk);
1565  	struct sk_buff *skb;
1566  	u32 dnode = tsk_peer_node(tsk);
1567  	int maxnagle = tsk->maxnagle;
1568  	int maxpkt = tsk->max_pkt;
1569  	int send, sent = 0;
1570  	int blocks, rc = 0;
1571  
1572  	if (unlikely(dlen > INT_MAX))
1573  		return -EMSGSIZE;
1574  
1575  	/* Handle implicit connection setup */
1576  	if (unlikely(dest && sk->sk_state == TIPC_OPEN)) {
1577  		rc = __tipc_sendmsg(sock, m, dlen);
1578  		if (dlen && dlen == rc) {
1579  			tsk->peer_caps = tipc_node_get_capabilities(net, dnode);
1580  			tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
1581  		}
1582  		return rc;
1583  	}
1584  
1585  	do {
1586  		rc = tipc_wait_for_cond(sock, &timeout,
1587  					(!tsk->cong_link_cnt &&
1588  					 !tsk_conn_cong(tsk) &&
1589  					 tipc_sk_connected(sk)));
1590  		if (unlikely(rc))
1591  			break;
1592  		send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
1593  		blocks = tsk->snd_backlog;
1594  		if (tsk->oneway++ >= tsk->nagle_start && maxnagle &&
1595  		    send <= maxnagle) {
1596  			rc = tipc_msg_append(hdr, m, send, maxnagle, txq);
1597  			if (unlikely(rc < 0))
1598  				break;
1599  			blocks += rc;
1600  			tsk->msg_acc++;
1601  			if (blocks <= 64 && tsk->expect_ack) {
1602  				tsk->snd_backlog = blocks;
1603  				sent += send;
1604  				break;
1605  			} else if (blocks > 64) {
1606  				tsk->pkt_cnt += skb_queue_len(txq);
1607  			} else {
1608  				skb = skb_peek_tail(txq);
1609  				if (skb) {
1610  					msg_set_ack_required(buf_msg(skb));
1611  					tsk->expect_ack = true;
1612  				} else {
1613  					tsk->expect_ack = false;
1614  				}
1615  				tsk->msg_acc = 0;
1616  				tsk->pkt_cnt = 0;
1617  			}
1618  		} else {
1619  			rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq);
1620  			if (unlikely(rc != send))
1621  				break;
1622  			blocks += tsk_inc(tsk, send + MIN_H_SIZE);
1623  		}
1624  		trace_tipc_sk_sendstream(sk, skb_peek(txq),
1625  					 TIPC_DUMP_SK_SNDQ, " ");
1626  		rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
1627  		if (unlikely(rc == -ELINKCONG)) {
1628  			tsk->cong_link_cnt = 1;
1629  			rc = 0;
1630  		}
1631  		if (likely(!rc)) {
1632  			tsk->snt_unacked += blocks;
1633  			tsk->snd_backlog = 0;
1634  			sent += send;
1635  		}
1636  	} while (sent < dlen && !rc);
1637  
1638  	return sent ? sent : rc;
1639  }
1640  
1641  /**
1642   * tipc_send_packet - send a connection-oriented message
1643   * @sock: socket structure
1644   * @m: message to send
1645   * @dsz: length of data to be transmitted
1646   *
1647   * Used for SOCK_SEQPACKET messages.
1648   *
1649   * Return: the number of bytes sent on success, or errno otherwise
1650   */
tipc_send_packet(struct socket * sock,struct msghdr * m,size_t dsz)1651  static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
1652  {
1653  	if (dsz > TIPC_MAX_USER_MSG_SIZE)
1654  		return -EMSGSIZE;
1655  
1656  	return tipc_sendstream(sock, m, dsz);
1657  }
1658  
1659  /* tipc_sk_finish_conn - complete the setup of a connection
1660   */
tipc_sk_finish_conn(struct tipc_sock * tsk,u32 peer_port,u32 peer_node)1661  static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1662  				u32 peer_node)
1663  {
1664  	struct sock *sk = &tsk->sk;
1665  	struct net *net = sock_net(sk);
1666  	struct tipc_msg *msg = &tsk->phdr;
1667  
1668  	msg_set_syn(msg, 0);
1669  	msg_set_destnode(msg, peer_node);
1670  	msg_set_destport(msg, peer_port);
1671  	msg_set_type(msg, TIPC_CONN_MSG);
1672  	msg_set_lookup_scope(msg, 0);
1673  	msg_set_hdr_sz(msg, SHORT_H_SIZE);
1674  
1675  	sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
1676  	tipc_set_sk_state(sk, TIPC_ESTABLISHED);
1677  	tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1678  	tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid, true);
1679  	tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
1680  	tsk_set_nagle(tsk);
1681  	__skb_queue_purge(&sk->sk_write_queue);
1682  	if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
1683  		return;
1684  
1685  	/* Fall back to message based flow control */
1686  	tsk->rcv_win = FLOWCTL_MSG_WIN;
1687  	tsk->snd_win = FLOWCTL_MSG_WIN;
1688  }
1689  
1690  /**
1691   * tipc_sk_set_orig_addr - capture sender's address for received message
1692   * @m: descriptor for message info
1693   * @skb: received message
1694   *
1695   * Note: Address is not captured if not requested by receiver.
1696   */
tipc_sk_set_orig_addr(struct msghdr * m,struct sk_buff * skb)1697  static void tipc_sk_set_orig_addr(struct msghdr *m, struct sk_buff *skb)
1698  {
1699  	DECLARE_SOCKADDR(struct sockaddr_pair *, srcaddr, m->msg_name);
1700  	struct tipc_msg *hdr = buf_msg(skb);
1701  
1702  	if (!srcaddr)
1703  		return;
1704  
1705  	srcaddr->sock.family = AF_TIPC;
1706  	srcaddr->sock.addrtype = TIPC_SOCKET_ADDR;
1707  	srcaddr->sock.scope = 0;
1708  	srcaddr->sock.addr.id.ref = msg_origport(hdr);
1709  	srcaddr->sock.addr.id.node = msg_orignode(hdr);
1710  	srcaddr->sock.addr.name.domain = 0;
1711  	m->msg_namelen = sizeof(struct sockaddr_tipc);
1712  
1713  	if (!msg_in_group(hdr))
1714  		return;
1715  
1716  	/* Group message users may also want to know sending member's id */
1717  	srcaddr->member.family = AF_TIPC;
1718  	srcaddr->member.addrtype = TIPC_SERVICE_ADDR;
1719  	srcaddr->member.scope = 0;
1720  	srcaddr->member.addr.name.name.type = msg_nametype(hdr);
1721  	srcaddr->member.addr.name.name.instance = TIPC_SKB_CB(skb)->orig_member;
1722  	srcaddr->member.addr.name.domain = 0;
1723  	m->msg_namelen = sizeof(*srcaddr);
1724  }
1725  
1726  /**
1727   * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1728   * @m: descriptor for message info
1729   * @skb: received message buffer
1730   * @tsk: TIPC port associated with message
1731   *
1732   * Note: Ancillary data is not captured if not requested by receiver.
1733   *
1734   * Return: 0 if successful, otherwise errno
1735   */
tipc_sk_anc_data_recv(struct msghdr * m,struct sk_buff * skb,struct tipc_sock * tsk)1736  static int tipc_sk_anc_data_recv(struct msghdr *m, struct sk_buff *skb,
1737  				 struct tipc_sock *tsk)
1738  {
1739  	struct tipc_msg *hdr;
1740  	u32 data[3] = {0,};
1741  	bool has_addr;
1742  	int dlen, rc;
1743  
1744  	if (likely(m->msg_controllen == 0))
1745  		return 0;
1746  
1747  	hdr = buf_msg(skb);
1748  	dlen = msg_data_sz(hdr);
1749  
1750  	/* Capture errored message object, if any */
1751  	if (msg_errcode(hdr)) {
1752  		if (skb_linearize(skb))
1753  			return -ENOMEM;
1754  		hdr = buf_msg(skb);
1755  		data[0] = msg_errcode(hdr);
1756  		data[1] = dlen;
1757  		rc = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, data);
1758  		if (rc || !dlen)
1759  			return rc;
1760  		rc = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, dlen, msg_data(hdr));
1761  		if (rc)
1762  			return rc;
1763  	}
1764  
1765  	/* Capture TIPC_SERVICE_ADDR/RANGE destination address, if any */
1766  	switch (msg_type(hdr)) {
1767  	case TIPC_NAMED_MSG:
1768  		has_addr = true;
1769  		data[0] = msg_nametype(hdr);
1770  		data[1] = msg_namelower(hdr);
1771  		data[2] = data[1];
1772  		break;
1773  	case TIPC_MCAST_MSG:
1774  		has_addr = true;
1775  		data[0] = msg_nametype(hdr);
1776  		data[1] = msg_namelower(hdr);
1777  		data[2] = msg_nameupper(hdr);
1778  		break;
1779  	case TIPC_CONN_MSG:
1780  		has_addr = !!tsk->conn_addrtype;
1781  		data[0] = msg_nametype(&tsk->phdr);
1782  		data[1] = msg_nameinst(&tsk->phdr);
1783  		data[2] = data[1];
1784  		break;
1785  	default:
1786  		has_addr = false;
1787  	}
1788  	if (!has_addr)
1789  		return 0;
1790  	return put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, data);
1791  }
1792  
tipc_sk_build_ack(struct tipc_sock * tsk)1793  static struct sk_buff *tipc_sk_build_ack(struct tipc_sock *tsk)
1794  {
1795  	struct sock *sk = &tsk->sk;
1796  	struct sk_buff *skb = NULL;
1797  	struct tipc_msg *msg;
1798  	u32 peer_port = tsk_peer_port(tsk);
1799  	u32 dnode = tsk_peer_node(tsk);
1800  
1801  	if (!tipc_sk_connected(sk))
1802  		return NULL;
1803  	skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
1804  			      dnode, tsk_own_node(tsk), peer_port,
1805  			      tsk->portid, TIPC_OK);
1806  	if (!skb)
1807  		return NULL;
1808  	msg = buf_msg(skb);
1809  	msg_set_conn_ack(msg, tsk->rcv_unacked);
1810  	tsk->rcv_unacked = 0;
1811  
1812  	/* Adjust to and advertize the correct window limit */
1813  	if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
1814  		tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
1815  		msg_set_adv_win(msg, tsk->rcv_win);
1816  	}
1817  	return skb;
1818  }
1819  
tipc_sk_send_ack(struct tipc_sock * tsk)1820  static void tipc_sk_send_ack(struct tipc_sock *tsk)
1821  {
1822  	struct sk_buff *skb;
1823  
1824  	skb = tipc_sk_build_ack(tsk);
1825  	if (!skb)
1826  		return;
1827  
1828  	tipc_node_xmit_skb(sock_net(&tsk->sk), skb, tsk_peer_node(tsk),
1829  			   msg_link_selector(buf_msg(skb)));
1830  }
1831  
tipc_wait_for_rcvmsg(struct socket * sock,long * timeop)1832  static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1833  {
1834  	struct sock *sk = sock->sk;
1835  	DEFINE_WAIT_FUNC(wait, woken_wake_function);
1836  	long timeo = *timeop;
1837  	int err = sock_error(sk);
1838  
1839  	if (err)
1840  		return err;
1841  
1842  	for (;;) {
1843  		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1844  			if (sk->sk_shutdown & RCV_SHUTDOWN) {
1845  				err = -ENOTCONN;
1846  				break;
1847  			}
1848  			add_wait_queue(sk_sleep(sk), &wait);
1849  			release_sock(sk);
1850  			timeo = wait_woken(&wait, TASK_INTERRUPTIBLE, timeo);
1851  			sched_annotate_sleep();
1852  			lock_sock(sk);
1853  			remove_wait_queue(sk_sleep(sk), &wait);
1854  		}
1855  		err = 0;
1856  		if (!skb_queue_empty(&sk->sk_receive_queue))
1857  			break;
1858  		err = -EAGAIN;
1859  		if (!timeo)
1860  			break;
1861  		err = sock_intr_errno(timeo);
1862  		if (signal_pending(current))
1863  			break;
1864  
1865  		err = sock_error(sk);
1866  		if (err)
1867  			break;
1868  	}
1869  	*timeop = timeo;
1870  	return err;
1871  }
1872  
1873  /**
1874   * tipc_recvmsg - receive packet-oriented message
1875   * @sock: network socket
1876   * @m: descriptor for message info
1877   * @buflen: length of user buffer area
1878   * @flags: receive flags
1879   *
1880   * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1881   * If the complete message doesn't fit in user area, truncate it.
1882   *
1883   * Return: size of returned message data, errno otherwise
1884   */
tipc_recvmsg(struct socket * sock,struct msghdr * m,size_t buflen,int flags)1885  static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1886  			size_t buflen,	int flags)
1887  {
1888  	struct sock *sk = sock->sk;
1889  	bool connected = !tipc_sk_type_connectionless(sk);
1890  	struct tipc_sock *tsk = tipc_sk(sk);
1891  	int rc, err, hlen, dlen, copy;
1892  	struct tipc_skb_cb *skb_cb;
1893  	struct sk_buff_head xmitq;
1894  	struct tipc_msg *hdr;
1895  	struct sk_buff *skb;
1896  	bool grp_evt;
1897  	long timeout;
1898  
1899  	/* Catch invalid receive requests */
1900  	if (unlikely(!buflen))
1901  		return -EINVAL;
1902  
1903  	lock_sock(sk);
1904  	if (unlikely(connected && sk->sk_state == TIPC_OPEN)) {
1905  		rc = -ENOTCONN;
1906  		goto exit;
1907  	}
1908  	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1909  
1910  	/* Step rcv queue to first msg with data or error; wait if necessary */
1911  	do {
1912  		rc = tipc_wait_for_rcvmsg(sock, &timeout);
1913  		if (unlikely(rc))
1914  			goto exit;
1915  		skb = skb_peek(&sk->sk_receive_queue);
1916  		skb_cb = TIPC_SKB_CB(skb);
1917  		hdr = buf_msg(skb);
1918  		dlen = msg_data_sz(hdr);
1919  		hlen = msg_hdr_sz(hdr);
1920  		err = msg_errcode(hdr);
1921  		grp_evt = msg_is_grp_evt(hdr);
1922  		if (likely(dlen || err))
1923  			break;
1924  		tsk_advance_rx_queue(sk);
1925  	} while (1);
1926  
1927  	/* Collect msg meta data, including error code and rejected data */
1928  	tipc_sk_set_orig_addr(m, skb);
1929  	rc = tipc_sk_anc_data_recv(m, skb, tsk);
1930  	if (unlikely(rc))
1931  		goto exit;
1932  	hdr = buf_msg(skb);
1933  
1934  	/* Capture data if non-error msg, otherwise just set return value */
1935  	if (likely(!err)) {
1936  		int offset = skb_cb->bytes_read;
1937  
1938  		copy = min_t(int, dlen - offset, buflen);
1939  		rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
1940  		if (unlikely(rc))
1941  			goto exit;
1942  		if (unlikely(offset + copy < dlen)) {
1943  			if (flags & MSG_EOR) {
1944  				if (!(flags & MSG_PEEK))
1945  					skb_cb->bytes_read = offset + copy;
1946  			} else {
1947  				m->msg_flags |= MSG_TRUNC;
1948  				skb_cb->bytes_read = 0;
1949  			}
1950  		} else {
1951  			if (flags & MSG_EOR)
1952  				m->msg_flags |= MSG_EOR;
1953  			skb_cb->bytes_read = 0;
1954  		}
1955  	} else {
1956  		copy = 0;
1957  		rc = 0;
1958  		if (err != TIPC_CONN_SHUTDOWN && connected && !m->msg_control) {
1959  			rc = -ECONNRESET;
1960  			goto exit;
1961  		}
1962  	}
1963  
1964  	/* Mark message as group event if applicable */
1965  	if (unlikely(grp_evt)) {
1966  		if (msg_grp_evt(hdr) == TIPC_WITHDRAWN)
1967  			m->msg_flags |= MSG_EOR;
1968  		m->msg_flags |= MSG_OOB;
1969  		copy = 0;
1970  	}
1971  
1972  	/* Caption of data or error code/rejected data was successful */
1973  	if (unlikely(flags & MSG_PEEK))
1974  		goto exit;
1975  
1976  	/* Send group flow control advertisement when applicable */
1977  	if (tsk->group && msg_in_group(hdr) && !grp_evt) {
1978  		__skb_queue_head_init(&xmitq);
1979  		tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
1980  					  msg_orignode(hdr), msg_origport(hdr),
1981  					  &xmitq);
1982  		tipc_node_distr_xmit(sock_net(sk), &xmitq);
1983  	}
1984  
1985  	if (skb_cb->bytes_read)
1986  		goto exit;
1987  
1988  	tsk_advance_rx_queue(sk);
1989  
1990  	if (likely(!connected))
1991  		goto exit;
1992  
1993  	/* Send connection flow control advertisement when applicable */
1994  	tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
1995  	if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
1996  		tipc_sk_send_ack(tsk);
1997  exit:
1998  	release_sock(sk);
1999  	return rc ? rc : copy;
2000  }
2001  
2002  /**
2003   * tipc_recvstream - receive stream-oriented data
2004   * @sock: network socket
2005   * @m: descriptor for message info
2006   * @buflen: total size of user buffer area
2007   * @flags: receive flags
2008   *
2009   * Used for SOCK_STREAM messages only.  If not enough data is available
2010   * will optionally wait for more; never truncates data.
2011   *
2012   * Return: size of returned message data, errno otherwise
2013   */
tipc_recvstream(struct socket * sock,struct msghdr * m,size_t buflen,int flags)2014  static int tipc_recvstream(struct socket *sock, struct msghdr *m,
2015  			   size_t buflen, int flags)
2016  {
2017  	struct sock *sk = sock->sk;
2018  	struct tipc_sock *tsk = tipc_sk(sk);
2019  	struct sk_buff *skb;
2020  	struct tipc_msg *hdr;
2021  	struct tipc_skb_cb *skb_cb;
2022  	bool peek = flags & MSG_PEEK;
2023  	int offset, required, copy, copied = 0;
2024  	int hlen, dlen, err, rc;
2025  	long timeout;
2026  
2027  	/* Catch invalid receive attempts */
2028  	if (unlikely(!buflen))
2029  		return -EINVAL;
2030  
2031  	lock_sock(sk);
2032  
2033  	if (unlikely(sk->sk_state == TIPC_OPEN)) {
2034  		rc = -ENOTCONN;
2035  		goto exit;
2036  	}
2037  	required = sock_rcvlowat(sk, flags & MSG_WAITALL, buflen);
2038  	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
2039  
2040  	do {
2041  		/* Look at first msg in receive queue; wait if necessary */
2042  		rc = tipc_wait_for_rcvmsg(sock, &timeout);
2043  		if (unlikely(rc))
2044  			break;
2045  		skb = skb_peek(&sk->sk_receive_queue);
2046  		skb_cb = TIPC_SKB_CB(skb);
2047  		hdr = buf_msg(skb);
2048  		dlen = msg_data_sz(hdr);
2049  		hlen = msg_hdr_sz(hdr);
2050  		err = msg_errcode(hdr);
2051  
2052  		/* Discard any empty non-errored (SYN-) message */
2053  		if (unlikely(!dlen && !err)) {
2054  			tsk_advance_rx_queue(sk);
2055  			continue;
2056  		}
2057  
2058  		/* Collect msg meta data, incl. error code and rejected data */
2059  		if (!copied) {
2060  			tipc_sk_set_orig_addr(m, skb);
2061  			rc = tipc_sk_anc_data_recv(m, skb, tsk);
2062  			if (rc)
2063  				break;
2064  			hdr = buf_msg(skb);
2065  		}
2066  
2067  		/* Copy data if msg ok, otherwise return error/partial data */
2068  		if (likely(!err)) {
2069  			offset = skb_cb->bytes_read;
2070  			copy = min_t(int, dlen - offset, buflen - copied);
2071  			rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
2072  			if (unlikely(rc))
2073  				break;
2074  			copied += copy;
2075  			offset += copy;
2076  			if (unlikely(offset < dlen)) {
2077  				if (!peek)
2078  					skb_cb->bytes_read = offset;
2079  				break;
2080  			}
2081  		} else {
2082  			rc = 0;
2083  			if ((err != TIPC_CONN_SHUTDOWN) && !m->msg_control)
2084  				rc = -ECONNRESET;
2085  			if (copied || rc)
2086  				break;
2087  		}
2088  
2089  		if (unlikely(peek))
2090  			break;
2091  
2092  		tsk_advance_rx_queue(sk);
2093  
2094  		/* Send connection flow control advertisement when applicable */
2095  		tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
2096  		if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
2097  			tipc_sk_send_ack(tsk);
2098  
2099  		/* Exit if all requested data or FIN/error received */
2100  		if (copied == buflen || err)
2101  			break;
2102  
2103  	} while (!skb_queue_empty(&sk->sk_receive_queue) || copied < required);
2104  exit:
2105  	release_sock(sk);
2106  	return copied ? copied : rc;
2107  }
2108  
2109  /**
2110   * tipc_write_space - wake up thread if port congestion is released
2111   * @sk: socket
2112   */
tipc_write_space(struct sock * sk)2113  static void tipc_write_space(struct sock *sk)
2114  {
2115  	struct socket_wq *wq;
2116  
2117  	rcu_read_lock();
2118  	wq = rcu_dereference(sk->sk_wq);
2119  	if (skwq_has_sleeper(wq))
2120  		wake_up_interruptible_sync_poll(&wq->wait, EPOLLOUT |
2121  						EPOLLWRNORM | EPOLLWRBAND);
2122  	rcu_read_unlock();
2123  }
2124  
2125  /**
2126   * tipc_data_ready - wake up threads to indicate messages have been received
2127   * @sk: socket
2128   */
tipc_data_ready(struct sock * sk)2129  static void tipc_data_ready(struct sock *sk)
2130  {
2131  	struct socket_wq *wq;
2132  
2133  	rcu_read_lock();
2134  	wq = rcu_dereference(sk->sk_wq);
2135  	if (skwq_has_sleeper(wq))
2136  		wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN |
2137  						EPOLLRDNORM | EPOLLRDBAND);
2138  	rcu_read_unlock();
2139  }
2140  
tipc_sock_destruct(struct sock * sk)2141  static void tipc_sock_destruct(struct sock *sk)
2142  {
2143  	__skb_queue_purge(&sk->sk_receive_queue);
2144  }
2145  
tipc_sk_proto_rcv(struct sock * sk,struct sk_buff_head * inputq,struct sk_buff_head * xmitq)2146  static void tipc_sk_proto_rcv(struct sock *sk,
2147  			      struct sk_buff_head *inputq,
2148  			      struct sk_buff_head *xmitq)
2149  {
2150  	struct sk_buff *skb = __skb_dequeue(inputq);
2151  	struct tipc_sock *tsk = tipc_sk(sk);
2152  	struct tipc_msg *hdr = buf_msg(skb);
2153  	struct tipc_group *grp = tsk->group;
2154  	bool wakeup = false;
2155  
2156  	switch (msg_user(hdr)) {
2157  	case CONN_MANAGER:
2158  		tipc_sk_conn_proto_rcv(tsk, skb, inputq, xmitq);
2159  		return;
2160  	case SOCK_WAKEUP:
2161  		tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);
2162  		/* coupled with smp_rmb() in tipc_wait_for_cond() */
2163  		smp_wmb();
2164  		tsk->cong_link_cnt--;
2165  		wakeup = true;
2166  		tipc_sk_push_backlog(tsk, false);
2167  		break;
2168  	case GROUP_PROTOCOL:
2169  		tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
2170  		break;
2171  	case TOP_SRV:
2172  		tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
2173  				      hdr, inputq, xmitq);
2174  		break;
2175  	default:
2176  		break;
2177  	}
2178  
2179  	if (wakeup)
2180  		sk->sk_write_space(sk);
2181  
2182  	kfree_skb(skb);
2183  }
2184  
2185  /**
2186   * tipc_sk_filter_connect - check incoming message for a connection-based socket
2187   * @tsk: TIPC socket
2188   * @skb: pointer to message buffer.
2189   * @xmitq: for Nagle ACK if any
2190   * Return: true if message should be added to receive queue, false otherwise
2191   */
tipc_sk_filter_connect(struct tipc_sock * tsk,struct sk_buff * skb,struct sk_buff_head * xmitq)2192  static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb,
2193  				   struct sk_buff_head *xmitq)
2194  {
2195  	struct sock *sk = &tsk->sk;
2196  	struct net *net = sock_net(sk);
2197  	struct tipc_msg *hdr = buf_msg(skb);
2198  	bool con_msg = msg_connected(hdr);
2199  	u32 pport = tsk_peer_port(tsk);
2200  	u32 pnode = tsk_peer_node(tsk);
2201  	u32 oport = msg_origport(hdr);
2202  	u32 onode = msg_orignode(hdr);
2203  	int err = msg_errcode(hdr);
2204  	unsigned long delay;
2205  
2206  	if (unlikely(msg_mcast(hdr)))
2207  		return false;
2208  	tsk->oneway = 0;
2209  
2210  	switch (sk->sk_state) {
2211  	case TIPC_CONNECTING:
2212  		/* Setup ACK */
2213  		if (likely(con_msg)) {
2214  			if (err)
2215  				break;
2216  			tipc_sk_finish_conn(tsk, oport, onode);
2217  			msg_set_importance(&tsk->phdr, msg_importance(hdr));
2218  			/* ACK+ message with data is added to receive queue */
2219  			if (msg_data_sz(hdr))
2220  				return true;
2221  			/* Empty ACK-, - wake up sleeping connect() and drop */
2222  			sk->sk_state_change(sk);
2223  			msg_set_dest_droppable(hdr, 1);
2224  			return false;
2225  		}
2226  		/* Ignore connectionless message if not from listening socket */
2227  		if (oport != pport || onode != pnode)
2228  			return false;
2229  
2230  		/* Rejected SYN */
2231  		if (err != TIPC_ERR_OVERLOAD)
2232  			break;
2233  
2234  		/* Prepare for new setup attempt if we have a SYN clone */
2235  		if (skb_queue_empty(&sk->sk_write_queue))
2236  			break;
2237  		get_random_bytes(&delay, 2);
2238  		delay %= (tsk->conn_timeout / 4);
2239  		delay = msecs_to_jiffies(delay + 100);
2240  		sk_reset_timer(sk, &sk->sk_timer, jiffies + delay);
2241  		return false;
2242  	case TIPC_OPEN:
2243  	case TIPC_DISCONNECTING:
2244  		return false;
2245  	case TIPC_LISTEN:
2246  		/* Accept only SYN message */
2247  		if (!msg_is_syn(hdr) &&
2248  		    tipc_node_get_capabilities(net, onode) & TIPC_SYN_BIT)
2249  			return false;
2250  		if (!con_msg && !err)
2251  			return true;
2252  		return false;
2253  	case TIPC_ESTABLISHED:
2254  		if (!skb_queue_empty(&sk->sk_write_queue))
2255  			tipc_sk_push_backlog(tsk, false);
2256  		/* Accept only connection-based messages sent by peer */
2257  		if (likely(con_msg && !err && pport == oport &&
2258  			   pnode == onode)) {
2259  			if (msg_ack_required(hdr)) {
2260  				struct sk_buff *skb;
2261  
2262  				skb = tipc_sk_build_ack(tsk);
2263  				if (skb) {
2264  					msg_set_nagle_ack(buf_msg(skb));
2265  					__skb_queue_tail(xmitq, skb);
2266  				}
2267  			}
2268  			return true;
2269  		}
2270  		if (!tsk_peer_msg(tsk, hdr))
2271  			return false;
2272  		if (!err)
2273  			return true;
2274  		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2275  		tipc_node_remove_conn(net, pnode, tsk->portid);
2276  		sk->sk_state_change(sk);
2277  		return true;
2278  	default:
2279  		pr_err("Unknown sk_state %u\n", sk->sk_state);
2280  	}
2281  	/* Abort connection setup attempt */
2282  	tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2283  	sk->sk_err = ECONNREFUSED;
2284  	sk->sk_state_change(sk);
2285  	return true;
2286  }
2287  
2288  /**
2289   * rcvbuf_limit - get proper overload limit of socket receive queue
2290   * @sk: socket
2291   * @skb: message
2292   *
2293   * For connection oriented messages, irrespective of importance,
2294   * default queue limit is 2 MB.
2295   *
2296   * For connectionless messages, queue limits are based on message
2297   * importance as follows:
2298   *
2299   * TIPC_LOW_IMPORTANCE       (2 MB)
2300   * TIPC_MEDIUM_IMPORTANCE    (4 MB)
2301   * TIPC_HIGH_IMPORTANCE      (8 MB)
2302   * TIPC_CRITICAL_IMPORTANCE  (16 MB)
2303   *
2304   * Return: overload limit according to corresponding message importance
2305   */
rcvbuf_limit(struct sock * sk,struct sk_buff * skb)2306  static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
2307  {
2308  	struct tipc_sock *tsk = tipc_sk(sk);
2309  	struct tipc_msg *hdr = buf_msg(skb);
2310  
2311  	if (unlikely(msg_in_group(hdr)))
2312  		return READ_ONCE(sk->sk_rcvbuf);
2313  
2314  	if (unlikely(!msg_connected(hdr)))
2315  		return READ_ONCE(sk->sk_rcvbuf) << msg_importance(hdr);
2316  
2317  	if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
2318  		return READ_ONCE(sk->sk_rcvbuf);
2319  
2320  	return FLOWCTL_MSG_LIM;
2321  }
2322  
2323  /**
2324   * tipc_sk_filter_rcv - validate incoming message
2325   * @sk: socket
2326   * @skb: pointer to message.
2327   * @xmitq: output message area (FIXME)
2328   *
2329   * Enqueues message on receive queue if acceptable; optionally handles
2330   * disconnect indication for a connected socket.
2331   *
2332   * Called with socket lock already taken
2333   */
tipc_sk_filter_rcv(struct sock * sk,struct sk_buff * skb,struct sk_buff_head * xmitq)2334  static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
2335  			       struct sk_buff_head *xmitq)
2336  {
2337  	bool sk_conn = !tipc_sk_type_connectionless(sk);
2338  	struct tipc_sock *tsk = tipc_sk(sk);
2339  	struct tipc_group *grp = tsk->group;
2340  	struct tipc_msg *hdr = buf_msg(skb);
2341  	struct net *net = sock_net(sk);
2342  	struct sk_buff_head inputq;
2343  	int mtyp = msg_type(hdr);
2344  	int limit, err = TIPC_OK;
2345  
2346  	trace_tipc_sk_filter_rcv(sk, skb, TIPC_DUMP_ALL, " ");
2347  	TIPC_SKB_CB(skb)->bytes_read = 0;
2348  	__skb_queue_head_init(&inputq);
2349  	__skb_queue_tail(&inputq, skb);
2350  
2351  	if (unlikely(!msg_isdata(hdr)))
2352  		tipc_sk_proto_rcv(sk, &inputq, xmitq);
2353  
2354  	if (unlikely(grp))
2355  		tipc_group_filter_msg(grp, &inputq, xmitq);
2356  
2357  	if (unlikely(!grp) && mtyp == TIPC_MCAST_MSG)
2358  		tipc_mcast_filter_msg(net, &tsk->mc_method.deferredq, &inputq);
2359  
2360  	/* Validate and add to receive buffer if there is space */
2361  	while ((skb = __skb_dequeue(&inputq))) {
2362  		hdr = buf_msg(skb);
2363  		limit = rcvbuf_limit(sk, skb);
2364  		if ((sk_conn && !tipc_sk_filter_connect(tsk, skb, xmitq)) ||
2365  		    (!sk_conn && msg_connected(hdr)) ||
2366  		    (!grp && msg_in_group(hdr)))
2367  			err = TIPC_ERR_NO_PORT;
2368  		else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) {
2369  			trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL,
2370  					   "err_overload2!");
2371  			atomic_inc(&sk->sk_drops);
2372  			err = TIPC_ERR_OVERLOAD;
2373  		}
2374  
2375  		if (unlikely(err)) {
2376  			if (tipc_msg_reverse(tipc_own_addr(net), &skb, err)) {
2377  				trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE,
2378  						      "@filter_rcv!");
2379  				__skb_queue_tail(xmitq, skb);
2380  			}
2381  			err = TIPC_OK;
2382  			continue;
2383  		}
2384  		__skb_queue_tail(&sk->sk_receive_queue, skb);
2385  		skb_set_owner_r(skb, sk);
2386  		trace_tipc_sk_overlimit2(sk, skb, TIPC_DUMP_ALL,
2387  					 "rcvq >90% allocated!");
2388  		sk->sk_data_ready(sk);
2389  	}
2390  }
2391  
2392  /**
2393   * tipc_sk_backlog_rcv - handle incoming message from backlog queue
2394   * @sk: socket
2395   * @skb: message
2396   *
2397   * Caller must hold socket lock
2398   */
tipc_sk_backlog_rcv(struct sock * sk,struct sk_buff * skb)2399  static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
2400  {
2401  	unsigned int before = sk_rmem_alloc_get(sk);
2402  	struct sk_buff_head xmitq;
2403  	unsigned int added;
2404  
2405  	__skb_queue_head_init(&xmitq);
2406  
2407  	tipc_sk_filter_rcv(sk, skb, &xmitq);
2408  	added = sk_rmem_alloc_get(sk) - before;
2409  	atomic_add(added, &tipc_sk(sk)->dupl_rcvcnt);
2410  
2411  	/* Send pending response/rejected messages, if any */
2412  	tipc_node_distr_xmit(sock_net(sk), &xmitq);
2413  	return 0;
2414  }
2415  
2416  /**
2417   * tipc_sk_enqueue - extract all buffers with destination 'dport' from
2418   *                   inputq and try adding them to socket or backlog queue
2419   * @inputq: list of incoming buffers with potentially different destinations
2420   * @sk: socket where the buffers should be enqueued
2421   * @dport: port number for the socket
2422   * @xmitq: output queue
2423   *
2424   * Caller must hold socket lock
2425   */
tipc_sk_enqueue(struct sk_buff_head * inputq,struct sock * sk,u32 dport,struct sk_buff_head * xmitq)2426  static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
2427  			    u32 dport, struct sk_buff_head *xmitq)
2428  {
2429  	unsigned long time_limit = jiffies + usecs_to_jiffies(20000);
2430  	struct sk_buff *skb;
2431  	unsigned int lim;
2432  	atomic_t *dcnt;
2433  	u32 onode;
2434  
2435  	while (skb_queue_len(inputq)) {
2436  		if (unlikely(time_after_eq(jiffies, time_limit)))
2437  			return;
2438  
2439  		skb = tipc_skb_dequeue(inputq, dport);
2440  		if (unlikely(!skb))
2441  			return;
2442  
2443  		/* Add message directly to receive queue if possible */
2444  		if (!sock_owned_by_user(sk)) {
2445  			tipc_sk_filter_rcv(sk, skb, xmitq);
2446  			continue;
2447  		}
2448  
2449  		/* Try backlog, compensating for double-counted bytes */
2450  		dcnt = &tipc_sk(sk)->dupl_rcvcnt;
2451  		if (!sk->sk_backlog.len)
2452  			atomic_set(dcnt, 0);
2453  		lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
2454  		if (likely(!sk_add_backlog(sk, skb, lim))) {
2455  			trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL,
2456  						 "bklg & rcvq >90% allocated!");
2457  			continue;
2458  		}
2459  
2460  		trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, "err_overload!");
2461  		/* Overload => reject message back to sender */
2462  		onode = tipc_own_addr(sock_net(sk));
2463  		atomic_inc(&sk->sk_drops);
2464  		if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD)) {
2465  			trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_ALL,
2466  					      "@sk_enqueue!");
2467  			__skb_queue_tail(xmitq, skb);
2468  		}
2469  		break;
2470  	}
2471  }
2472  
2473  /**
2474   * tipc_sk_rcv - handle a chain of incoming buffers
2475   * @net: the associated network namespace
2476   * @inputq: buffer list containing the buffers
2477   * Consumes all buffers in list until inputq is empty
2478   * Note: may be called in multiple threads referring to the same queue
2479   */
tipc_sk_rcv(struct net * net,struct sk_buff_head * inputq)2480  void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
2481  {
2482  	struct sk_buff_head xmitq;
2483  	u32 dnode, dport = 0;
2484  	int err;
2485  	struct tipc_sock *tsk;
2486  	struct sock *sk;
2487  	struct sk_buff *skb;
2488  
2489  	__skb_queue_head_init(&xmitq);
2490  	while (skb_queue_len(inputq)) {
2491  		dport = tipc_skb_peek_port(inputq, dport);
2492  		tsk = tipc_sk_lookup(net, dport);
2493  
2494  		if (likely(tsk)) {
2495  			sk = &tsk->sk;
2496  			if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
2497  				tipc_sk_enqueue(inputq, sk, dport, &xmitq);
2498  				spin_unlock_bh(&sk->sk_lock.slock);
2499  			}
2500  			/* Send pending response/rejected messages, if any */
2501  			tipc_node_distr_xmit(sock_net(sk), &xmitq);
2502  			sock_put(sk);
2503  			continue;
2504  		}
2505  		/* No destination socket => dequeue skb if still there */
2506  		skb = tipc_skb_dequeue(inputq, dport);
2507  		if (!skb)
2508  			return;
2509  
2510  		/* Try secondary lookup if unresolved named message */
2511  		err = TIPC_ERR_NO_PORT;
2512  		if (tipc_msg_lookup_dest(net, skb, &err))
2513  			goto xmit;
2514  
2515  		/* Prepare for message rejection */
2516  		if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
2517  			continue;
2518  
2519  		trace_tipc_sk_rej_msg(NULL, skb, TIPC_DUMP_NONE, "@sk_rcv!");
2520  xmit:
2521  		dnode = msg_destnode(buf_msg(skb));
2522  		tipc_node_xmit_skb(net, skb, dnode, dport);
2523  	}
2524  }
2525  
tipc_wait_for_connect(struct socket * sock,long * timeo_p)2526  static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
2527  {
2528  	DEFINE_WAIT_FUNC(wait, woken_wake_function);
2529  	struct sock *sk = sock->sk;
2530  	int done;
2531  
2532  	do {
2533  		int err = sock_error(sk);
2534  		if (err)
2535  			return err;
2536  		if (!*timeo_p)
2537  			return -ETIMEDOUT;
2538  		if (signal_pending(current))
2539  			return sock_intr_errno(*timeo_p);
2540  		if (sk->sk_state == TIPC_DISCONNECTING)
2541  			break;
2542  
2543  		add_wait_queue(sk_sleep(sk), &wait);
2544  		done = sk_wait_event(sk, timeo_p, tipc_sk_connected(sk),
2545  				     &wait);
2546  		remove_wait_queue(sk_sleep(sk), &wait);
2547  	} while (!done);
2548  	return 0;
2549  }
2550  
tipc_sockaddr_is_sane(struct sockaddr_tipc * addr)2551  static bool tipc_sockaddr_is_sane(struct sockaddr_tipc *addr)
2552  {
2553  	if (addr->family != AF_TIPC)
2554  		return false;
2555  	if (addr->addrtype == TIPC_SERVICE_RANGE)
2556  		return (addr->addr.nameseq.lower <= addr->addr.nameseq.upper);
2557  	return (addr->addrtype == TIPC_SERVICE_ADDR ||
2558  		addr->addrtype == TIPC_SOCKET_ADDR);
2559  }
2560  
2561  /**
2562   * tipc_connect - establish a connection to another TIPC port
2563   * @sock: socket structure
2564   * @dest: socket address for destination port
2565   * @destlen: size of socket address data structure
2566   * @flags: file-related flags associated with socket
2567   *
2568   * Return: 0 on success, errno otherwise
2569   */
tipc_connect(struct socket * sock,struct sockaddr * dest,int destlen,int flags)2570  static int tipc_connect(struct socket *sock, struct sockaddr *dest,
2571  			int destlen, int flags)
2572  {
2573  	struct sock *sk = sock->sk;
2574  	struct tipc_sock *tsk = tipc_sk(sk);
2575  	struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
2576  	struct msghdr m = {NULL,};
2577  	long timeout = (flags & O_NONBLOCK) ? 0 : tsk->conn_timeout;
2578  	int previous;
2579  	int res = 0;
2580  
2581  	if (destlen != sizeof(struct sockaddr_tipc))
2582  		return -EINVAL;
2583  
2584  	lock_sock(sk);
2585  
2586  	if (tsk->group) {
2587  		res = -EINVAL;
2588  		goto exit;
2589  	}
2590  
2591  	if (dst->family == AF_UNSPEC) {
2592  		memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
2593  		if (!tipc_sk_type_connectionless(sk))
2594  			res = -EINVAL;
2595  		goto exit;
2596  	}
2597  	if (!tipc_sockaddr_is_sane(dst)) {
2598  		res = -EINVAL;
2599  		goto exit;
2600  	}
2601  	/* DGRAM/RDM connect(), just save the destaddr */
2602  	if (tipc_sk_type_connectionless(sk)) {
2603  		memcpy(&tsk->peer, dest, destlen);
2604  		goto exit;
2605  	} else if (dst->addrtype == TIPC_SERVICE_RANGE) {
2606  		res = -EINVAL;
2607  		goto exit;
2608  	}
2609  
2610  	previous = sk->sk_state;
2611  
2612  	switch (sk->sk_state) {
2613  	case TIPC_OPEN:
2614  		/* Send a 'SYN-' to destination */
2615  		m.msg_name = dest;
2616  		m.msg_namelen = destlen;
2617  
2618  		/* If connect is in non-blocking case, set MSG_DONTWAIT to
2619  		 * indicate send_msg() is never blocked.
2620  		 */
2621  		if (!timeout)
2622  			m.msg_flags = MSG_DONTWAIT;
2623  
2624  		res = __tipc_sendmsg(sock, &m, 0);
2625  		if ((res < 0) && (res != -EWOULDBLOCK))
2626  			goto exit;
2627  
2628  		/* Just entered TIPC_CONNECTING state; the only
2629  		 * difference is that return value in non-blocking
2630  		 * case is EINPROGRESS, rather than EALREADY.
2631  		 */
2632  		res = -EINPROGRESS;
2633  		fallthrough;
2634  	case TIPC_CONNECTING:
2635  		if (!timeout) {
2636  			if (previous == TIPC_CONNECTING)
2637  				res = -EALREADY;
2638  			goto exit;
2639  		}
2640  		timeout = msecs_to_jiffies(timeout);
2641  		/* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
2642  		res = tipc_wait_for_connect(sock, &timeout);
2643  		break;
2644  	case TIPC_ESTABLISHED:
2645  		res = -EISCONN;
2646  		break;
2647  	default:
2648  		res = -EINVAL;
2649  	}
2650  
2651  exit:
2652  	release_sock(sk);
2653  	return res;
2654  }
2655  
2656  /**
2657   * tipc_listen - allow socket to listen for incoming connections
2658   * @sock: socket structure
2659   * @len: (unused)
2660   *
2661   * Return: 0 on success, errno otherwise
2662   */
tipc_listen(struct socket * sock,int len)2663  static int tipc_listen(struct socket *sock, int len)
2664  {
2665  	struct sock *sk = sock->sk;
2666  	int res;
2667  
2668  	lock_sock(sk);
2669  	res = tipc_set_sk_state(sk, TIPC_LISTEN);
2670  	release_sock(sk);
2671  
2672  	return res;
2673  }
2674  
tipc_wait_for_accept(struct socket * sock,long timeo)2675  static int tipc_wait_for_accept(struct socket *sock, long timeo)
2676  {
2677  	struct sock *sk = sock->sk;
2678  	DEFINE_WAIT_FUNC(wait, woken_wake_function);
2679  	int err;
2680  
2681  	/* True wake-one mechanism for incoming connections: only
2682  	 * one process gets woken up, not the 'whole herd'.
2683  	 * Since we do not 'race & poll' for established sockets
2684  	 * anymore, the common case will execute the loop only once.
2685  	*/
2686  	for (;;) {
2687  		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
2688  			add_wait_queue(sk_sleep(sk), &wait);
2689  			release_sock(sk);
2690  			timeo = wait_woken(&wait, TASK_INTERRUPTIBLE, timeo);
2691  			lock_sock(sk);
2692  			remove_wait_queue(sk_sleep(sk), &wait);
2693  		}
2694  		err = 0;
2695  		if (!skb_queue_empty(&sk->sk_receive_queue))
2696  			break;
2697  		err = -EAGAIN;
2698  		if (!timeo)
2699  			break;
2700  		err = sock_intr_errno(timeo);
2701  		if (signal_pending(current))
2702  			break;
2703  	}
2704  	return err;
2705  }
2706  
2707  /**
2708   * tipc_accept - wait for connection request
2709   * @sock: listening socket
2710   * @new_sock: new socket that is to be connected
2711   * @flags: file-related flags associated with socket
2712   * @kern: caused by kernel or by userspace?
2713   *
2714   * Return: 0 on success, errno otherwise
2715   */
tipc_accept(struct socket * sock,struct socket * new_sock,int flags,bool kern)2716  static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags,
2717  		       bool kern)
2718  {
2719  	struct sock *new_sk, *sk = sock->sk;
2720  	struct tipc_sock *new_tsock;
2721  	struct msghdr m = {NULL,};
2722  	struct tipc_msg *msg;
2723  	struct sk_buff *buf;
2724  	long timeo;
2725  	int res;
2726  
2727  	lock_sock(sk);
2728  
2729  	if (sk->sk_state != TIPC_LISTEN) {
2730  		res = -EINVAL;
2731  		goto exit;
2732  	}
2733  	timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
2734  	res = tipc_wait_for_accept(sock, timeo);
2735  	if (res)
2736  		goto exit;
2737  
2738  	buf = skb_peek(&sk->sk_receive_queue);
2739  
2740  	res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, kern);
2741  	if (res)
2742  		goto exit;
2743  	security_sk_clone(sock->sk, new_sock->sk);
2744  
2745  	new_sk = new_sock->sk;
2746  	new_tsock = tipc_sk(new_sk);
2747  	msg = buf_msg(buf);
2748  
2749  	/* we lock on new_sk; but lockdep sees the lock on sk */
2750  	lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
2751  
2752  	/*
2753  	 * Reject any stray messages received by new socket
2754  	 * before the socket lock was taken (very, very unlikely)
2755  	 */
2756  	tsk_rej_rx_queue(new_sk, TIPC_ERR_NO_PORT);
2757  
2758  	/* Connect new socket to it's peer */
2759  	tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2760  
2761  	tsk_set_importance(new_sk, msg_importance(msg));
2762  	if (msg_named(msg)) {
2763  		new_tsock->conn_addrtype = TIPC_SERVICE_ADDR;
2764  		msg_set_nametype(&new_tsock->phdr, msg_nametype(msg));
2765  		msg_set_nameinst(&new_tsock->phdr, msg_nameinst(msg));
2766  	}
2767  
2768  	/*
2769  	 * Respond to 'SYN-' by discarding it & returning 'ACK'.
2770  	 * Respond to 'SYN+' by queuing it on new socket & returning 'ACK'.
2771  	 */
2772  	if (!msg_data_sz(msg)) {
2773  		tsk_advance_rx_queue(sk);
2774  	} else {
2775  		__skb_dequeue(&sk->sk_receive_queue);
2776  		__skb_queue_head(&new_sk->sk_receive_queue, buf);
2777  		skb_set_owner_r(buf, new_sk);
2778  	}
2779  	__tipc_sendstream(new_sock, &m, 0);
2780  	release_sock(new_sk);
2781  exit:
2782  	release_sock(sk);
2783  	return res;
2784  }
2785  
2786  /**
2787   * tipc_shutdown - shutdown socket connection
2788   * @sock: socket structure
2789   * @how: direction to close (must be SHUT_RDWR)
2790   *
2791   * Terminates connection (if necessary), then purges socket's receive queue.
2792   *
2793   * Return: 0 on success, errno otherwise
2794   */
tipc_shutdown(struct socket * sock,int how)2795  static int tipc_shutdown(struct socket *sock, int how)
2796  {
2797  	struct sock *sk = sock->sk;
2798  	int res;
2799  
2800  	if (how != SHUT_RDWR)
2801  		return -EINVAL;
2802  
2803  	lock_sock(sk);
2804  
2805  	trace_tipc_sk_shutdown(sk, NULL, TIPC_DUMP_ALL, " ");
2806  	__tipc_shutdown(sock, TIPC_CONN_SHUTDOWN);
2807  	sk->sk_shutdown = SHUTDOWN_MASK;
2808  
2809  	if (sk->sk_state == TIPC_DISCONNECTING) {
2810  		/* Discard any unreceived messages */
2811  		__skb_queue_purge(&sk->sk_receive_queue);
2812  
2813  		res = 0;
2814  	} else {
2815  		res = -ENOTCONN;
2816  	}
2817  	/* Wake up anyone sleeping in poll. */
2818  	sk->sk_state_change(sk);
2819  
2820  	release_sock(sk);
2821  	return res;
2822  }
2823  
tipc_sk_check_probing_state(struct sock * sk,struct sk_buff_head * list)2824  static void tipc_sk_check_probing_state(struct sock *sk,
2825  					struct sk_buff_head *list)
2826  {
2827  	struct tipc_sock *tsk = tipc_sk(sk);
2828  	u32 pnode = tsk_peer_node(tsk);
2829  	u32 pport = tsk_peer_port(tsk);
2830  	u32 self = tsk_own_node(tsk);
2831  	u32 oport = tsk->portid;
2832  	struct sk_buff *skb;
2833  
2834  	if (tsk->probe_unacked) {
2835  		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2836  		sk->sk_err = ECONNABORTED;
2837  		tipc_node_remove_conn(sock_net(sk), pnode, pport);
2838  		sk->sk_state_change(sk);
2839  		return;
2840  	}
2841  	/* Prepare new probe */
2842  	skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, 0,
2843  			      pnode, self, pport, oport, TIPC_OK);
2844  	if (skb)
2845  		__skb_queue_tail(list, skb);
2846  	tsk->probe_unacked = true;
2847  	sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
2848  }
2849  
tipc_sk_retry_connect(struct sock * sk,struct sk_buff_head * list)2850  static void tipc_sk_retry_connect(struct sock *sk, struct sk_buff_head *list)
2851  {
2852  	struct tipc_sock *tsk = tipc_sk(sk);
2853  
2854  	/* Try again later if dest link is congested */
2855  	if (tsk->cong_link_cnt) {
2856  		sk_reset_timer(sk, &sk->sk_timer,
2857  			       jiffies + msecs_to_jiffies(100));
2858  		return;
2859  	}
2860  	/* Prepare SYN for retransmit */
2861  	tipc_msg_skb_clone(&sk->sk_write_queue, list);
2862  }
2863  
tipc_sk_timeout(struct timer_list * t)2864  static void tipc_sk_timeout(struct timer_list *t)
2865  {
2866  	struct sock *sk = from_timer(sk, t, sk_timer);
2867  	struct tipc_sock *tsk = tipc_sk(sk);
2868  	u32 pnode = tsk_peer_node(tsk);
2869  	struct sk_buff_head list;
2870  	int rc = 0;
2871  
2872  	__skb_queue_head_init(&list);
2873  	bh_lock_sock(sk);
2874  
2875  	/* Try again later if socket is busy */
2876  	if (sock_owned_by_user(sk)) {
2877  		sk_reset_timer(sk, &sk->sk_timer, jiffies + HZ / 20);
2878  		bh_unlock_sock(sk);
2879  		sock_put(sk);
2880  		return;
2881  	}
2882  
2883  	if (sk->sk_state == TIPC_ESTABLISHED)
2884  		tipc_sk_check_probing_state(sk, &list);
2885  	else if (sk->sk_state == TIPC_CONNECTING)
2886  		tipc_sk_retry_connect(sk, &list);
2887  
2888  	bh_unlock_sock(sk);
2889  
2890  	if (!skb_queue_empty(&list))
2891  		rc = tipc_node_xmit(sock_net(sk), &list, pnode, tsk->portid);
2892  
2893  	/* SYN messages may cause link congestion */
2894  	if (rc == -ELINKCONG) {
2895  		tipc_dest_push(&tsk->cong_links, pnode, 0);
2896  		tsk->cong_link_cnt = 1;
2897  	}
2898  	sock_put(sk);
2899  }
2900  
tipc_sk_publish(struct tipc_sock * tsk,struct tipc_uaddr * ua)2901  static int tipc_sk_publish(struct tipc_sock *tsk, struct tipc_uaddr *ua)
2902  {
2903  	struct sock *sk = &tsk->sk;
2904  	struct net *net = sock_net(sk);
2905  	struct tipc_socket_addr skaddr;
2906  	struct publication *p;
2907  	u32 key;
2908  
2909  	if (tipc_sk_connected(sk))
2910  		return -EINVAL;
2911  	key = tsk->portid + tsk->pub_count + 1;
2912  	if (key == tsk->portid)
2913  		return -EADDRINUSE;
2914  	skaddr.ref = tsk->portid;
2915  	skaddr.node = tipc_own_addr(net);
2916  	p = tipc_nametbl_publish(net, ua, &skaddr, key);
2917  	if (unlikely(!p))
2918  		return -EINVAL;
2919  
2920  	list_add(&p->binding_sock, &tsk->publications);
2921  	tsk->pub_count++;
2922  	tsk->published = true;
2923  	return 0;
2924  }
2925  
tipc_sk_withdraw(struct tipc_sock * tsk,struct tipc_uaddr * ua)2926  static int tipc_sk_withdraw(struct tipc_sock *tsk, struct tipc_uaddr *ua)
2927  {
2928  	struct net *net = sock_net(&tsk->sk);
2929  	struct publication *safe, *p;
2930  	struct tipc_uaddr _ua;
2931  	int rc = -EINVAL;
2932  
2933  	list_for_each_entry_safe(p, safe, &tsk->publications, binding_sock) {
2934  		if (!ua) {
2935  			tipc_uaddr(&_ua, TIPC_SERVICE_RANGE, p->scope,
2936  				   p->sr.type, p->sr.lower, p->sr.upper);
2937  			tipc_nametbl_withdraw(net, &_ua, &p->sk, p->key);
2938  			continue;
2939  		}
2940  		/* Unbind specific publication */
2941  		if (p->scope != ua->scope)
2942  			continue;
2943  		if (p->sr.type != ua->sr.type)
2944  			continue;
2945  		if (p->sr.lower != ua->sr.lower)
2946  			continue;
2947  		if (p->sr.upper != ua->sr.upper)
2948  			break;
2949  		tipc_nametbl_withdraw(net, ua, &p->sk, p->key);
2950  		rc = 0;
2951  		break;
2952  	}
2953  	if (list_empty(&tsk->publications)) {
2954  		tsk->published = 0;
2955  		rc = 0;
2956  	}
2957  	return rc;
2958  }
2959  
2960  /* tipc_sk_reinit: set non-zero address in all existing sockets
2961   *                 when we go from standalone to network mode.
2962   */
tipc_sk_reinit(struct net * net)2963  void tipc_sk_reinit(struct net *net)
2964  {
2965  	struct tipc_net *tn = net_generic(net, tipc_net_id);
2966  	struct rhashtable_iter iter;
2967  	struct tipc_sock *tsk;
2968  	struct tipc_msg *msg;
2969  
2970  	rhashtable_walk_enter(&tn->sk_rht, &iter);
2971  
2972  	do {
2973  		rhashtable_walk_start(&iter);
2974  
2975  		while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) {
2976  			sock_hold(&tsk->sk);
2977  			rhashtable_walk_stop(&iter);
2978  			lock_sock(&tsk->sk);
2979  			msg = &tsk->phdr;
2980  			msg_set_prevnode(msg, tipc_own_addr(net));
2981  			msg_set_orignode(msg, tipc_own_addr(net));
2982  			release_sock(&tsk->sk);
2983  			rhashtable_walk_start(&iter);
2984  			sock_put(&tsk->sk);
2985  		}
2986  
2987  		rhashtable_walk_stop(&iter);
2988  	} while (tsk == ERR_PTR(-EAGAIN));
2989  
2990  	rhashtable_walk_exit(&iter);
2991  }
2992  
tipc_sk_lookup(struct net * net,u32 portid)2993  static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2994  {
2995  	struct tipc_net *tn = net_generic(net, tipc_net_id);
2996  	struct tipc_sock *tsk;
2997  
2998  	rcu_read_lock();
2999  	tsk = rhashtable_lookup(&tn->sk_rht, &portid, tsk_rht_params);
3000  	if (tsk)
3001  		sock_hold(&tsk->sk);
3002  	rcu_read_unlock();
3003  
3004  	return tsk;
3005  }
3006  
tipc_sk_insert(struct tipc_sock * tsk)3007  static int tipc_sk_insert(struct tipc_sock *tsk)
3008  {
3009  	struct sock *sk = &tsk->sk;
3010  	struct net *net = sock_net(sk);
3011  	struct tipc_net *tn = net_generic(net, tipc_net_id);
3012  	u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
3013  	u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
3014  
3015  	while (remaining--) {
3016  		portid++;
3017  		if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
3018  			portid = TIPC_MIN_PORT;
3019  		tsk->portid = portid;
3020  		sock_hold(&tsk->sk);
3021  		if (!rhashtable_lookup_insert_fast(&tn->sk_rht, &tsk->node,
3022  						   tsk_rht_params))
3023  			return 0;
3024  		sock_put(&tsk->sk);
3025  	}
3026  
3027  	return -1;
3028  }
3029  
tipc_sk_remove(struct tipc_sock * tsk)3030  static void tipc_sk_remove(struct tipc_sock *tsk)
3031  {
3032  	struct sock *sk = &tsk->sk;
3033  	struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
3034  
3035  	if (!rhashtable_remove_fast(&tn->sk_rht, &tsk->node, tsk_rht_params)) {
3036  		WARN_ON(refcount_read(&sk->sk_refcnt) == 1);
3037  		__sock_put(sk);
3038  	}
3039  }
3040  
3041  static const struct rhashtable_params tsk_rht_params = {
3042  	.nelem_hint = 192,
3043  	.head_offset = offsetof(struct tipc_sock, node),
3044  	.key_offset = offsetof(struct tipc_sock, portid),
3045  	.key_len = sizeof(u32), /* portid */
3046  	.max_size = 1048576,
3047  	.min_size = 256,
3048  	.automatic_shrinking = true,
3049  };
3050  
tipc_sk_rht_init(struct net * net)3051  int tipc_sk_rht_init(struct net *net)
3052  {
3053  	struct tipc_net *tn = net_generic(net, tipc_net_id);
3054  
3055  	return rhashtable_init(&tn->sk_rht, &tsk_rht_params);
3056  }
3057  
tipc_sk_rht_destroy(struct net * net)3058  void tipc_sk_rht_destroy(struct net *net)
3059  {
3060  	struct tipc_net *tn = net_generic(net, tipc_net_id);
3061  
3062  	/* Wait for socket readers to complete */
3063  	synchronize_net();
3064  
3065  	rhashtable_destroy(&tn->sk_rht);
3066  }
3067  
tipc_sk_join(struct tipc_sock * tsk,struct tipc_group_req * mreq)3068  static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
3069  {
3070  	struct net *net = sock_net(&tsk->sk);
3071  	struct tipc_group *grp = tsk->group;
3072  	struct tipc_msg *hdr = &tsk->phdr;
3073  	struct tipc_uaddr ua;
3074  	int rc;
3075  
3076  	if (mreq->type < TIPC_RESERVED_TYPES)
3077  		return -EACCES;
3078  	if (mreq->scope > TIPC_NODE_SCOPE)
3079  		return -EINVAL;
3080  	if (mreq->scope != TIPC_NODE_SCOPE)
3081  		mreq->scope = TIPC_CLUSTER_SCOPE;
3082  	if (grp)
3083  		return -EACCES;
3084  	grp = tipc_group_create(net, tsk->portid, mreq, &tsk->group_is_open);
3085  	if (!grp)
3086  		return -ENOMEM;
3087  	tsk->group = grp;
3088  	msg_set_lookup_scope(hdr, mreq->scope);
3089  	msg_set_nametype(hdr, mreq->type);
3090  	msg_set_dest_droppable(hdr, true);
3091  	tipc_uaddr(&ua, TIPC_SERVICE_RANGE, mreq->scope,
3092  		   mreq->type, mreq->instance, mreq->instance);
3093  	tipc_nametbl_build_group(net, grp, &ua);
3094  	rc = tipc_sk_publish(tsk, &ua);
3095  	if (rc) {
3096  		tipc_group_delete(net, grp);
3097  		tsk->group = NULL;
3098  		return rc;
3099  	}
3100  	/* Eliminate any risk that a broadcast overtakes sent JOINs */
3101  	tsk->mc_method.rcast = true;
3102  	tsk->mc_method.mandatory = true;
3103  	tipc_group_join(net, grp, &tsk->sk.sk_rcvbuf);
3104  	return rc;
3105  }
3106  
tipc_sk_leave(struct tipc_sock * tsk)3107  static int tipc_sk_leave(struct tipc_sock *tsk)
3108  {
3109  	struct net *net = sock_net(&tsk->sk);
3110  	struct tipc_group *grp = tsk->group;
3111  	struct tipc_uaddr ua;
3112  	int scope;
3113  
3114  	if (!grp)
3115  		return -EINVAL;
3116  	ua.addrtype = TIPC_SERVICE_RANGE;
3117  	tipc_group_self(grp, &ua.sr, &scope);
3118  	ua.scope = scope;
3119  	tipc_group_delete(net, grp);
3120  	tsk->group = NULL;
3121  	tipc_sk_withdraw(tsk, &ua);
3122  	return 0;
3123  }
3124  
3125  /**
3126   * tipc_setsockopt - set socket option
3127   * @sock: socket structure
3128   * @lvl: option level
3129   * @opt: option identifier
3130   * @ov: pointer to new option value
3131   * @ol: length of option value
3132   *
3133   * For stream sockets only, accepts and ignores all IPPROTO_TCP options
3134   * (to ease compatibility).
3135   *
3136   * Return: 0 on success, errno otherwise
3137   */
tipc_setsockopt(struct socket * sock,int lvl,int opt,sockptr_t ov,unsigned int ol)3138  static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
3139  			   sockptr_t ov, unsigned int ol)
3140  {
3141  	struct sock *sk = sock->sk;
3142  	struct tipc_sock *tsk = tipc_sk(sk);
3143  	struct tipc_group_req mreq;
3144  	u32 value = 0;
3145  	int res = 0;
3146  
3147  	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
3148  		return 0;
3149  	if (lvl != SOL_TIPC)
3150  		return -ENOPROTOOPT;
3151  
3152  	switch (opt) {
3153  	case TIPC_IMPORTANCE:
3154  	case TIPC_SRC_DROPPABLE:
3155  	case TIPC_DEST_DROPPABLE:
3156  	case TIPC_CONN_TIMEOUT:
3157  	case TIPC_NODELAY:
3158  		if (ol < sizeof(value))
3159  			return -EINVAL;
3160  		if (copy_from_sockptr(&value, ov, sizeof(u32)))
3161  			return -EFAULT;
3162  		break;
3163  	case TIPC_GROUP_JOIN:
3164  		if (ol < sizeof(mreq))
3165  			return -EINVAL;
3166  		if (copy_from_sockptr(&mreq, ov, sizeof(mreq)))
3167  			return -EFAULT;
3168  		break;
3169  	default:
3170  		if (!sockptr_is_null(ov) || ol)
3171  			return -EINVAL;
3172  	}
3173  
3174  	lock_sock(sk);
3175  
3176  	switch (opt) {
3177  	case TIPC_IMPORTANCE:
3178  		res = tsk_set_importance(sk, value);
3179  		break;
3180  	case TIPC_SRC_DROPPABLE:
3181  		if (sock->type != SOCK_STREAM)
3182  			tsk_set_unreliable(tsk, value);
3183  		else
3184  			res = -ENOPROTOOPT;
3185  		break;
3186  	case TIPC_DEST_DROPPABLE:
3187  		tsk_set_unreturnable(tsk, value);
3188  		break;
3189  	case TIPC_CONN_TIMEOUT:
3190  		tipc_sk(sk)->conn_timeout = value;
3191  		break;
3192  	case TIPC_MCAST_BROADCAST:
3193  		tsk->mc_method.rcast = false;
3194  		tsk->mc_method.mandatory = true;
3195  		break;
3196  	case TIPC_MCAST_REPLICAST:
3197  		tsk->mc_method.rcast = true;
3198  		tsk->mc_method.mandatory = true;
3199  		break;
3200  	case TIPC_GROUP_JOIN:
3201  		res = tipc_sk_join(tsk, &mreq);
3202  		break;
3203  	case TIPC_GROUP_LEAVE:
3204  		res = tipc_sk_leave(tsk);
3205  		break;
3206  	case TIPC_NODELAY:
3207  		tsk->nodelay = !!value;
3208  		tsk_set_nagle(tsk);
3209  		break;
3210  	default:
3211  		res = -EINVAL;
3212  	}
3213  
3214  	release_sock(sk);
3215  
3216  	return res;
3217  }
3218  
3219  /**
3220   * tipc_getsockopt - get socket option
3221   * @sock: socket structure
3222   * @lvl: option level
3223   * @opt: option identifier
3224   * @ov: receptacle for option value
3225   * @ol: receptacle for length of option value
3226   *
3227   * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
3228   * (to ease compatibility).
3229   *
3230   * Return: 0 on success, errno otherwise
3231   */
tipc_getsockopt(struct socket * sock,int lvl,int opt,char __user * ov,int __user * ol)3232  static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
3233  			   char __user *ov, int __user *ol)
3234  {
3235  	struct sock *sk = sock->sk;
3236  	struct tipc_sock *tsk = tipc_sk(sk);
3237  	struct tipc_service_range seq;
3238  	int len, scope;
3239  	u32 value;
3240  	int res;
3241  
3242  	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
3243  		return put_user(0, ol);
3244  	if (lvl != SOL_TIPC)
3245  		return -ENOPROTOOPT;
3246  	res = get_user(len, ol);
3247  	if (res)
3248  		return res;
3249  
3250  	lock_sock(sk);
3251  
3252  	switch (opt) {
3253  	case TIPC_IMPORTANCE:
3254  		value = tsk_importance(tsk);
3255  		break;
3256  	case TIPC_SRC_DROPPABLE:
3257  		value = tsk_unreliable(tsk);
3258  		break;
3259  	case TIPC_DEST_DROPPABLE:
3260  		value = tsk_unreturnable(tsk);
3261  		break;
3262  	case TIPC_CONN_TIMEOUT:
3263  		value = tsk->conn_timeout;
3264  		/* no need to set "res", since already 0 at this point */
3265  		break;
3266  	case TIPC_NODE_RECVQ_DEPTH:
3267  		value = 0; /* was tipc_queue_size, now obsolete */
3268  		break;
3269  	case TIPC_SOCK_RECVQ_DEPTH:
3270  		value = skb_queue_len(&sk->sk_receive_queue);
3271  		break;
3272  	case TIPC_SOCK_RECVQ_USED:
3273  		value = sk_rmem_alloc_get(sk);
3274  		break;
3275  	case TIPC_GROUP_JOIN:
3276  		seq.type = 0;
3277  		if (tsk->group)
3278  			tipc_group_self(tsk->group, &seq, &scope);
3279  		value = seq.type;
3280  		break;
3281  	default:
3282  		res = -EINVAL;
3283  	}
3284  
3285  	release_sock(sk);
3286  
3287  	if (res)
3288  		return res;	/* "get" failed */
3289  
3290  	if (len < sizeof(value))
3291  		return -EINVAL;
3292  
3293  	if (copy_to_user(ov, &value, sizeof(value)))
3294  		return -EFAULT;
3295  
3296  	return put_user(sizeof(value), ol);
3297  }
3298  
tipc_ioctl(struct socket * sock,unsigned int cmd,unsigned long arg)3299  static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
3300  {
3301  	struct net *net = sock_net(sock->sk);
3302  	struct tipc_sioc_nodeid_req nr = {0};
3303  	struct tipc_sioc_ln_req lnr;
3304  	void __user *argp = (void __user *)arg;
3305  
3306  	switch (cmd) {
3307  	case SIOCGETLINKNAME:
3308  		if (copy_from_user(&lnr, argp, sizeof(lnr)))
3309  			return -EFAULT;
3310  		if (!tipc_node_get_linkname(net,
3311  					    lnr.bearer_id & 0xffff, lnr.peer,
3312  					    lnr.linkname, TIPC_MAX_LINK_NAME)) {
3313  			if (copy_to_user(argp, &lnr, sizeof(lnr)))
3314  				return -EFAULT;
3315  			return 0;
3316  		}
3317  		return -EADDRNOTAVAIL;
3318  	case SIOCGETNODEID:
3319  		if (copy_from_user(&nr, argp, sizeof(nr)))
3320  			return -EFAULT;
3321  		if (!tipc_node_get_id(net, nr.peer, nr.node_id))
3322  			return -EADDRNOTAVAIL;
3323  		if (copy_to_user(argp, &nr, sizeof(nr)))
3324  			return -EFAULT;
3325  		return 0;
3326  	default:
3327  		return -ENOIOCTLCMD;
3328  	}
3329  }
3330  
tipc_socketpair(struct socket * sock1,struct socket * sock2)3331  static int tipc_socketpair(struct socket *sock1, struct socket *sock2)
3332  {
3333  	struct tipc_sock *tsk2 = tipc_sk(sock2->sk);
3334  	struct tipc_sock *tsk1 = tipc_sk(sock1->sk);
3335  	u32 onode = tipc_own_addr(sock_net(sock1->sk));
3336  
3337  	tsk1->peer.family = AF_TIPC;
3338  	tsk1->peer.addrtype = TIPC_SOCKET_ADDR;
3339  	tsk1->peer.scope = TIPC_NODE_SCOPE;
3340  	tsk1->peer.addr.id.ref = tsk2->portid;
3341  	tsk1->peer.addr.id.node = onode;
3342  	tsk2->peer.family = AF_TIPC;
3343  	tsk2->peer.addrtype = TIPC_SOCKET_ADDR;
3344  	tsk2->peer.scope = TIPC_NODE_SCOPE;
3345  	tsk2->peer.addr.id.ref = tsk1->portid;
3346  	tsk2->peer.addr.id.node = onode;
3347  
3348  	tipc_sk_finish_conn(tsk1, tsk2->portid, onode);
3349  	tipc_sk_finish_conn(tsk2, tsk1->portid, onode);
3350  	return 0;
3351  }
3352  
3353  /* Protocol switches for the various types of TIPC sockets */
3354  
3355  static const struct proto_ops msg_ops = {
3356  	.owner		= THIS_MODULE,
3357  	.family		= AF_TIPC,
3358  	.release	= tipc_release,
3359  	.bind		= tipc_bind,
3360  	.connect	= tipc_connect,
3361  	.socketpair	= tipc_socketpair,
3362  	.accept		= sock_no_accept,
3363  	.getname	= tipc_getname,
3364  	.poll		= tipc_poll,
3365  	.ioctl		= tipc_ioctl,
3366  	.listen		= sock_no_listen,
3367  	.shutdown	= tipc_shutdown,
3368  	.setsockopt	= tipc_setsockopt,
3369  	.getsockopt	= tipc_getsockopt,
3370  	.sendmsg	= tipc_sendmsg,
3371  	.recvmsg	= tipc_recvmsg,
3372  	.mmap		= sock_no_mmap,
3373  	.sendpage	= sock_no_sendpage
3374  };
3375  
3376  static const struct proto_ops packet_ops = {
3377  	.owner		= THIS_MODULE,
3378  	.family		= AF_TIPC,
3379  	.release	= tipc_release,
3380  	.bind		= tipc_bind,
3381  	.connect	= tipc_connect,
3382  	.socketpair	= tipc_socketpair,
3383  	.accept		= tipc_accept,
3384  	.getname	= tipc_getname,
3385  	.poll		= tipc_poll,
3386  	.ioctl		= tipc_ioctl,
3387  	.listen		= tipc_listen,
3388  	.shutdown	= tipc_shutdown,
3389  	.setsockopt	= tipc_setsockopt,
3390  	.getsockopt	= tipc_getsockopt,
3391  	.sendmsg	= tipc_send_packet,
3392  	.recvmsg	= tipc_recvmsg,
3393  	.mmap		= sock_no_mmap,
3394  	.sendpage	= sock_no_sendpage
3395  };
3396  
3397  static const struct proto_ops stream_ops = {
3398  	.owner		= THIS_MODULE,
3399  	.family		= AF_TIPC,
3400  	.release	= tipc_release,
3401  	.bind		= tipc_bind,
3402  	.connect	= tipc_connect,
3403  	.socketpair	= tipc_socketpair,
3404  	.accept		= tipc_accept,
3405  	.getname	= tipc_getname,
3406  	.poll		= tipc_poll,
3407  	.ioctl		= tipc_ioctl,
3408  	.listen		= tipc_listen,
3409  	.shutdown	= tipc_shutdown,
3410  	.setsockopt	= tipc_setsockopt,
3411  	.getsockopt	= tipc_getsockopt,
3412  	.sendmsg	= tipc_sendstream,
3413  	.recvmsg	= tipc_recvstream,
3414  	.mmap		= sock_no_mmap,
3415  	.sendpage	= sock_no_sendpage
3416  };
3417  
3418  static const struct net_proto_family tipc_family_ops = {
3419  	.owner		= THIS_MODULE,
3420  	.family		= AF_TIPC,
3421  	.create		= tipc_sk_create
3422  };
3423  
3424  static struct proto tipc_proto = {
3425  	.name		= "TIPC",
3426  	.owner		= THIS_MODULE,
3427  	.obj_size	= sizeof(struct tipc_sock),
3428  	.sysctl_rmem	= sysctl_tipc_rmem
3429  };
3430  
3431  /**
3432   * tipc_socket_init - initialize TIPC socket interface
3433   *
3434   * Return: 0 on success, errno otherwise
3435   */
tipc_socket_init(void)3436  int tipc_socket_init(void)
3437  {
3438  	int res;
3439  
3440  	res = proto_register(&tipc_proto, 1);
3441  	if (res) {
3442  		pr_err("Failed to register TIPC protocol type\n");
3443  		goto out;
3444  	}
3445  
3446  	res = sock_register(&tipc_family_ops);
3447  	if (res) {
3448  		pr_err("Failed to register TIPC socket type\n");
3449  		proto_unregister(&tipc_proto);
3450  		goto out;
3451  	}
3452   out:
3453  	return res;
3454  }
3455  
3456  /**
3457   * tipc_socket_stop - stop TIPC socket interface
3458   */
tipc_socket_stop(void)3459  void tipc_socket_stop(void)
3460  {
3461  	sock_unregister(tipc_family_ops.family);
3462  	proto_unregister(&tipc_proto);
3463  }
3464  
3465  /* Caller should hold socket lock for the passed tipc socket. */
__tipc_nl_add_sk_con(struct sk_buff * skb,struct tipc_sock * tsk)3466  static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
3467  {
3468  	u32 peer_node, peer_port;
3469  	u32 conn_type, conn_instance;
3470  	struct nlattr *nest;
3471  
3472  	peer_node = tsk_peer_node(tsk);
3473  	peer_port = tsk_peer_port(tsk);
3474  	conn_type = msg_nametype(&tsk->phdr);
3475  	conn_instance = msg_nameinst(&tsk->phdr);
3476  	nest = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_CON);
3477  	if (!nest)
3478  		return -EMSGSIZE;
3479  
3480  	if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
3481  		goto msg_full;
3482  	if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
3483  		goto msg_full;
3484  
3485  	if (tsk->conn_addrtype != 0) {
3486  		if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
3487  			goto msg_full;
3488  		if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, conn_type))
3489  			goto msg_full;
3490  		if (nla_put_u32(skb, TIPC_NLA_CON_INST, conn_instance))
3491  			goto msg_full;
3492  	}
3493  	nla_nest_end(skb, nest);
3494  
3495  	return 0;
3496  
3497  msg_full:
3498  	nla_nest_cancel(skb, nest);
3499  
3500  	return -EMSGSIZE;
3501  }
3502  
__tipc_nl_add_sk_info(struct sk_buff * skb,struct tipc_sock * tsk)3503  static int __tipc_nl_add_sk_info(struct sk_buff *skb, struct tipc_sock
3504  			  *tsk)
3505  {
3506  	struct net *net = sock_net(skb->sk);
3507  	struct sock *sk = &tsk->sk;
3508  
3509  	if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid) ||
3510  	    nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr(net)))
3511  		return -EMSGSIZE;
3512  
3513  	if (tipc_sk_connected(sk)) {
3514  		if (__tipc_nl_add_sk_con(skb, tsk))
3515  			return -EMSGSIZE;
3516  	} else if (!list_empty(&tsk->publications)) {
3517  		if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
3518  			return -EMSGSIZE;
3519  	}
3520  	return 0;
3521  }
3522  
3523  /* Caller should hold socket lock for the passed tipc socket. */
__tipc_nl_add_sk(struct sk_buff * skb,struct netlink_callback * cb,struct tipc_sock * tsk)3524  static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
3525  			    struct tipc_sock *tsk)
3526  {
3527  	struct nlattr *attrs;
3528  	void *hdr;
3529  
3530  	hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
3531  			  &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
3532  	if (!hdr)
3533  		goto msg_cancel;
3534  
3535  	attrs = nla_nest_start_noflag(skb, TIPC_NLA_SOCK);
3536  	if (!attrs)
3537  		goto genlmsg_cancel;
3538  
3539  	if (__tipc_nl_add_sk_info(skb, tsk))
3540  		goto attr_msg_cancel;
3541  
3542  	nla_nest_end(skb, attrs);
3543  	genlmsg_end(skb, hdr);
3544  
3545  	return 0;
3546  
3547  attr_msg_cancel:
3548  	nla_nest_cancel(skb, attrs);
3549  genlmsg_cancel:
3550  	genlmsg_cancel(skb, hdr);
3551  msg_cancel:
3552  	return -EMSGSIZE;
3553  }
3554  
tipc_nl_sk_walk(struct sk_buff * skb,struct netlink_callback * cb,int (* skb_handler)(struct sk_buff * skb,struct netlink_callback * cb,struct tipc_sock * tsk))3555  int tipc_nl_sk_walk(struct sk_buff *skb, struct netlink_callback *cb,
3556  		    int (*skb_handler)(struct sk_buff *skb,
3557  				       struct netlink_callback *cb,
3558  				       struct tipc_sock *tsk))
3559  {
3560  	struct rhashtable_iter *iter = (void *)cb->args[4];
3561  	struct tipc_sock *tsk;
3562  	int err;
3563  
3564  	rhashtable_walk_start(iter);
3565  	while ((tsk = rhashtable_walk_next(iter)) != NULL) {
3566  		if (IS_ERR(tsk)) {
3567  			err = PTR_ERR(tsk);
3568  			if (err == -EAGAIN) {
3569  				err = 0;
3570  				continue;
3571  			}
3572  			break;
3573  		}
3574  
3575  		sock_hold(&tsk->sk);
3576  		rhashtable_walk_stop(iter);
3577  		lock_sock(&tsk->sk);
3578  		err = skb_handler(skb, cb, tsk);
3579  		if (err) {
3580  			release_sock(&tsk->sk);
3581  			sock_put(&tsk->sk);
3582  			goto out;
3583  		}
3584  		release_sock(&tsk->sk);
3585  		rhashtable_walk_start(iter);
3586  		sock_put(&tsk->sk);
3587  	}
3588  	rhashtable_walk_stop(iter);
3589  out:
3590  	return skb->len;
3591  }
3592  EXPORT_SYMBOL(tipc_nl_sk_walk);
3593  
tipc_dump_start(struct netlink_callback * cb)3594  int tipc_dump_start(struct netlink_callback *cb)
3595  {
3596  	return __tipc_dump_start(cb, sock_net(cb->skb->sk));
3597  }
3598  EXPORT_SYMBOL(tipc_dump_start);
3599  
__tipc_dump_start(struct netlink_callback * cb,struct net * net)3600  int __tipc_dump_start(struct netlink_callback *cb, struct net *net)
3601  {
3602  	/* tipc_nl_name_table_dump() uses cb->args[0...3]. */
3603  	struct rhashtable_iter *iter = (void *)cb->args[4];
3604  	struct tipc_net *tn = tipc_net(net);
3605  
3606  	if (!iter) {
3607  		iter = kmalloc(sizeof(*iter), GFP_KERNEL);
3608  		if (!iter)
3609  			return -ENOMEM;
3610  
3611  		cb->args[4] = (long)iter;
3612  	}
3613  
3614  	rhashtable_walk_enter(&tn->sk_rht, iter);
3615  	return 0;
3616  }
3617  
tipc_dump_done(struct netlink_callback * cb)3618  int tipc_dump_done(struct netlink_callback *cb)
3619  {
3620  	struct rhashtable_iter *hti = (void *)cb->args[4];
3621  
3622  	rhashtable_walk_exit(hti);
3623  	kfree(hti);
3624  	return 0;
3625  }
3626  EXPORT_SYMBOL(tipc_dump_done);
3627  
tipc_sk_fill_sock_diag(struct sk_buff * skb,struct netlink_callback * cb,struct tipc_sock * tsk,u32 sk_filter_state,u64 (* tipc_diag_gen_cookie)(struct sock * sk))3628  int tipc_sk_fill_sock_diag(struct sk_buff *skb, struct netlink_callback *cb,
3629  			   struct tipc_sock *tsk, u32 sk_filter_state,
3630  			   u64 (*tipc_diag_gen_cookie)(struct sock *sk))
3631  {
3632  	struct sock *sk = &tsk->sk;
3633  	struct nlattr *attrs;
3634  	struct nlattr *stat;
3635  
3636  	/*filter response w.r.t sk_state*/
3637  	if (!(sk_filter_state & (1 << sk->sk_state)))
3638  		return 0;
3639  
3640  	attrs = nla_nest_start_noflag(skb, TIPC_NLA_SOCK);
3641  	if (!attrs)
3642  		goto msg_cancel;
3643  
3644  	if (__tipc_nl_add_sk_info(skb, tsk))
3645  		goto attr_msg_cancel;
3646  
3647  	if (nla_put_u32(skb, TIPC_NLA_SOCK_TYPE, (u32)sk->sk_type) ||
3648  	    nla_put_u32(skb, TIPC_NLA_SOCK_TIPC_STATE, (u32)sk->sk_state) ||
3649  	    nla_put_u32(skb, TIPC_NLA_SOCK_INO, sock_i_ino(sk)) ||
3650  	    nla_put_u32(skb, TIPC_NLA_SOCK_UID,
3651  			from_kuid_munged(sk_user_ns(NETLINK_CB(cb->skb).sk),
3652  					 sock_i_uid(sk))) ||
3653  	    nla_put_u64_64bit(skb, TIPC_NLA_SOCK_COOKIE,
3654  			      tipc_diag_gen_cookie(sk),
3655  			      TIPC_NLA_SOCK_PAD))
3656  		goto attr_msg_cancel;
3657  
3658  	stat = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_STAT);
3659  	if (!stat)
3660  		goto attr_msg_cancel;
3661  
3662  	if (nla_put_u32(skb, TIPC_NLA_SOCK_STAT_RCVQ,
3663  			skb_queue_len(&sk->sk_receive_queue)) ||
3664  	    nla_put_u32(skb, TIPC_NLA_SOCK_STAT_SENDQ,
3665  			skb_queue_len(&sk->sk_write_queue)) ||
3666  	    nla_put_u32(skb, TIPC_NLA_SOCK_STAT_DROP,
3667  			atomic_read(&sk->sk_drops)))
3668  		goto stat_msg_cancel;
3669  
3670  	if (tsk->cong_link_cnt &&
3671  	    nla_put_flag(skb, TIPC_NLA_SOCK_STAT_LINK_CONG))
3672  		goto stat_msg_cancel;
3673  
3674  	if (tsk_conn_cong(tsk) &&
3675  	    nla_put_flag(skb, TIPC_NLA_SOCK_STAT_CONN_CONG))
3676  		goto stat_msg_cancel;
3677  
3678  	nla_nest_end(skb, stat);
3679  
3680  	if (tsk->group)
3681  		if (tipc_group_fill_sock_diag(tsk->group, skb))
3682  			goto stat_msg_cancel;
3683  
3684  	nla_nest_end(skb, attrs);
3685  
3686  	return 0;
3687  
3688  stat_msg_cancel:
3689  	nla_nest_cancel(skb, stat);
3690  attr_msg_cancel:
3691  	nla_nest_cancel(skb, attrs);
3692  msg_cancel:
3693  	return -EMSGSIZE;
3694  }
3695  EXPORT_SYMBOL(tipc_sk_fill_sock_diag);
3696  
tipc_nl_sk_dump(struct sk_buff * skb,struct netlink_callback * cb)3697  int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
3698  {
3699  	return tipc_nl_sk_walk(skb, cb, __tipc_nl_add_sk);
3700  }
3701  
3702  /* Caller should hold socket lock for the passed tipc socket. */
__tipc_nl_add_sk_publ(struct sk_buff * skb,struct netlink_callback * cb,struct publication * publ)3703  static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
3704  				 struct netlink_callback *cb,
3705  				 struct publication *publ)
3706  {
3707  	void *hdr;
3708  	struct nlattr *attrs;
3709  
3710  	hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
3711  			  &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
3712  	if (!hdr)
3713  		goto msg_cancel;
3714  
3715  	attrs = nla_nest_start_noflag(skb, TIPC_NLA_PUBL);
3716  	if (!attrs)
3717  		goto genlmsg_cancel;
3718  
3719  	if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
3720  		goto attr_msg_cancel;
3721  	if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->sr.type))
3722  		goto attr_msg_cancel;
3723  	if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->sr.lower))
3724  		goto attr_msg_cancel;
3725  	if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->sr.upper))
3726  		goto attr_msg_cancel;
3727  
3728  	nla_nest_end(skb, attrs);
3729  	genlmsg_end(skb, hdr);
3730  
3731  	return 0;
3732  
3733  attr_msg_cancel:
3734  	nla_nest_cancel(skb, attrs);
3735  genlmsg_cancel:
3736  	genlmsg_cancel(skb, hdr);
3737  msg_cancel:
3738  	return -EMSGSIZE;
3739  }
3740  
3741  /* Caller should hold socket lock for the passed tipc socket. */
__tipc_nl_list_sk_publ(struct sk_buff * skb,struct netlink_callback * cb,struct tipc_sock * tsk,u32 * last_publ)3742  static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
3743  				  struct netlink_callback *cb,
3744  				  struct tipc_sock *tsk, u32 *last_publ)
3745  {
3746  	int err;
3747  	struct publication *p;
3748  
3749  	if (*last_publ) {
3750  		list_for_each_entry(p, &tsk->publications, binding_sock) {
3751  			if (p->key == *last_publ)
3752  				break;
3753  		}
3754  		if (list_entry_is_head(p, &tsk->publications, binding_sock)) {
3755  			/* We never set seq or call nl_dump_check_consistent()
3756  			 * this means that setting prev_seq here will cause the
3757  			 * consistence check to fail in the netlink callback
3758  			 * handler. Resulting in the last NLMSG_DONE message
3759  			 * having the NLM_F_DUMP_INTR flag set.
3760  			 */
3761  			cb->prev_seq = 1;
3762  			*last_publ = 0;
3763  			return -EPIPE;
3764  		}
3765  	} else {
3766  		p = list_first_entry(&tsk->publications, struct publication,
3767  				     binding_sock);
3768  	}
3769  
3770  	list_for_each_entry_from(p, &tsk->publications, binding_sock) {
3771  		err = __tipc_nl_add_sk_publ(skb, cb, p);
3772  		if (err) {
3773  			*last_publ = p->key;
3774  			return err;
3775  		}
3776  	}
3777  	*last_publ = 0;
3778  
3779  	return 0;
3780  }
3781  
tipc_nl_publ_dump(struct sk_buff * skb,struct netlink_callback * cb)3782  int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
3783  {
3784  	int err;
3785  	u32 tsk_portid = cb->args[0];
3786  	u32 last_publ = cb->args[1];
3787  	u32 done = cb->args[2];
3788  	struct net *net = sock_net(skb->sk);
3789  	struct tipc_sock *tsk;
3790  
3791  	if (!tsk_portid) {
3792  		struct nlattr **attrs = genl_dumpit_info(cb)->attrs;
3793  		struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
3794  
3795  		if (!attrs[TIPC_NLA_SOCK])
3796  			return -EINVAL;
3797  
3798  		err = nla_parse_nested_deprecated(sock, TIPC_NLA_SOCK_MAX,
3799  						  attrs[TIPC_NLA_SOCK],
3800  						  tipc_nl_sock_policy, NULL);
3801  		if (err)
3802  			return err;
3803  
3804  		if (!sock[TIPC_NLA_SOCK_REF])
3805  			return -EINVAL;
3806  
3807  		tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
3808  	}
3809  
3810  	if (done)
3811  		return 0;
3812  
3813  	tsk = tipc_sk_lookup(net, tsk_portid);
3814  	if (!tsk)
3815  		return -EINVAL;
3816  
3817  	lock_sock(&tsk->sk);
3818  	err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
3819  	if (!err)
3820  		done = 1;
3821  	release_sock(&tsk->sk);
3822  	sock_put(&tsk->sk);
3823  
3824  	cb->args[0] = tsk_portid;
3825  	cb->args[1] = last_publ;
3826  	cb->args[2] = done;
3827  
3828  	return skb->len;
3829  }
3830  
3831  /**
3832   * tipc_sk_filtering - check if a socket should be traced
3833   * @sk: the socket to be examined
3834   *
3835   * @sysctl_tipc_sk_filter is used as the socket tuple for filtering:
3836   * (portid, sock type, name type, name lower, name upper)
3837   *
3838   * Return: true if the socket meets the socket tuple data
3839   * (value 0 = 'any') or when there is no tuple set (all = 0),
3840   * otherwise false
3841   */
tipc_sk_filtering(struct sock * sk)3842  bool tipc_sk_filtering(struct sock *sk)
3843  {
3844  	struct tipc_sock *tsk;
3845  	struct publication *p;
3846  	u32 _port, _sktype, _type, _lower, _upper;
3847  	u32 type = 0, lower = 0, upper = 0;
3848  
3849  	if (!sk)
3850  		return true;
3851  
3852  	tsk = tipc_sk(sk);
3853  
3854  	_port = sysctl_tipc_sk_filter[0];
3855  	_sktype = sysctl_tipc_sk_filter[1];
3856  	_type = sysctl_tipc_sk_filter[2];
3857  	_lower = sysctl_tipc_sk_filter[3];
3858  	_upper = sysctl_tipc_sk_filter[4];
3859  
3860  	if (!_port && !_sktype && !_type && !_lower && !_upper)
3861  		return true;
3862  
3863  	if (_port)
3864  		return (_port == tsk->portid);
3865  
3866  	if (_sktype && _sktype != sk->sk_type)
3867  		return false;
3868  
3869  	if (tsk->published) {
3870  		p = list_first_entry_or_null(&tsk->publications,
3871  					     struct publication, binding_sock);
3872  		if (p) {
3873  			type = p->sr.type;
3874  			lower = p->sr.lower;
3875  			upper = p->sr.upper;
3876  		}
3877  	}
3878  
3879  	if (!tipc_sk_type_connectionless(sk)) {
3880  		type = msg_nametype(&tsk->phdr);
3881  		lower = msg_nameinst(&tsk->phdr);
3882  		upper = lower;
3883  	}
3884  
3885  	if ((_type && _type != type) || (_lower && _lower != lower) ||
3886  	    (_upper && _upper != upper))
3887  		return false;
3888  
3889  	return true;
3890  }
3891  
tipc_sock_get_portid(struct sock * sk)3892  u32 tipc_sock_get_portid(struct sock *sk)
3893  {
3894  	return (sk) ? (tipc_sk(sk))->portid : 0;
3895  }
3896  
3897  /**
3898   * tipc_sk_overlimit1 - check if socket rx queue is about to be overloaded,
3899   *			both the rcv and backlog queues are considered
3900   * @sk: tipc sk to be checked
3901   * @skb: tipc msg to be checked
3902   *
3903   * Return: true if the socket rx queue allocation is > 90%, otherwise false
3904   */
3905  
tipc_sk_overlimit1(struct sock * sk,struct sk_buff * skb)3906  bool tipc_sk_overlimit1(struct sock *sk, struct sk_buff *skb)
3907  {
3908  	atomic_t *dcnt = &tipc_sk(sk)->dupl_rcvcnt;
3909  	unsigned int lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
3910  	unsigned int qsize = sk->sk_backlog.len + sk_rmem_alloc_get(sk);
3911  
3912  	return (qsize > lim * 90 / 100);
3913  }
3914  
3915  /**
3916   * tipc_sk_overlimit2 - check if socket rx queue is about to be overloaded,
3917   *			only the rcv queue is considered
3918   * @sk: tipc sk to be checked
3919   * @skb: tipc msg to be checked
3920   *
3921   * Return: true if the socket rx queue allocation is > 90%, otherwise false
3922   */
3923  
tipc_sk_overlimit2(struct sock * sk,struct sk_buff * skb)3924  bool tipc_sk_overlimit2(struct sock *sk, struct sk_buff *skb)
3925  {
3926  	unsigned int lim = rcvbuf_limit(sk, skb);
3927  	unsigned int qsize = sk_rmem_alloc_get(sk);
3928  
3929  	return (qsize > lim * 90 / 100);
3930  }
3931  
3932  /**
3933   * tipc_sk_dump - dump TIPC socket
3934   * @sk: tipc sk to be dumped
3935   * @dqueues: bitmask to decide if any socket queue to be dumped?
3936   *           - TIPC_DUMP_NONE: don't dump socket queues
3937   *           - TIPC_DUMP_SK_SNDQ: dump socket send queue
3938   *           - TIPC_DUMP_SK_RCVQ: dump socket rcv queue
3939   *           - TIPC_DUMP_SK_BKLGQ: dump socket backlog queue
3940   *           - TIPC_DUMP_ALL: dump all the socket queues above
3941   * @buf: returned buffer of dump data in format
3942   */
tipc_sk_dump(struct sock * sk,u16 dqueues,char * buf)3943  int tipc_sk_dump(struct sock *sk, u16 dqueues, char *buf)
3944  {
3945  	int i = 0;
3946  	size_t sz = (dqueues) ? SK_LMAX : SK_LMIN;
3947  	u32 conn_type, conn_instance;
3948  	struct tipc_sock *tsk;
3949  	struct publication *p;
3950  	bool tsk_connected;
3951  
3952  	if (!sk) {
3953  		i += scnprintf(buf, sz, "sk data: (null)\n");
3954  		return i;
3955  	}
3956  
3957  	tsk = tipc_sk(sk);
3958  	tsk_connected = !tipc_sk_type_connectionless(sk);
3959  
3960  	i += scnprintf(buf, sz, "sk data: %u", sk->sk_type);
3961  	i += scnprintf(buf + i, sz - i, " %d", sk->sk_state);
3962  	i += scnprintf(buf + i, sz - i, " %x", tsk_own_node(tsk));
3963  	i += scnprintf(buf + i, sz - i, " %u", tsk->portid);
3964  	i += scnprintf(buf + i, sz - i, " | %u", tsk_connected);
3965  	if (tsk_connected) {
3966  		i += scnprintf(buf + i, sz - i, " %x", tsk_peer_node(tsk));
3967  		i += scnprintf(buf + i, sz - i, " %u", tsk_peer_port(tsk));
3968  		conn_type = msg_nametype(&tsk->phdr);
3969  		conn_instance = msg_nameinst(&tsk->phdr);
3970  		i += scnprintf(buf + i, sz - i, " %u", conn_type);
3971  		i += scnprintf(buf + i, sz - i, " %u", conn_instance);
3972  	}
3973  	i += scnprintf(buf + i, sz - i, " | %u", tsk->published);
3974  	if (tsk->published) {
3975  		p = list_first_entry_or_null(&tsk->publications,
3976  					     struct publication, binding_sock);
3977  		i += scnprintf(buf + i, sz - i, " %u", (p) ? p->sr.type : 0);
3978  		i += scnprintf(buf + i, sz - i, " %u", (p) ? p->sr.lower : 0);
3979  		i += scnprintf(buf + i, sz - i, " %u", (p) ? p->sr.upper : 0);
3980  	}
3981  	i += scnprintf(buf + i, sz - i, " | %u", tsk->snd_win);
3982  	i += scnprintf(buf + i, sz - i, " %u", tsk->rcv_win);
3983  	i += scnprintf(buf + i, sz - i, " %u", tsk->max_pkt);
3984  	i += scnprintf(buf + i, sz - i, " %x", tsk->peer_caps);
3985  	i += scnprintf(buf + i, sz - i, " %u", tsk->cong_link_cnt);
3986  	i += scnprintf(buf + i, sz - i, " %u", tsk->snt_unacked);
3987  	i += scnprintf(buf + i, sz - i, " %u", tsk->rcv_unacked);
3988  	i += scnprintf(buf + i, sz - i, " %u", atomic_read(&tsk->dupl_rcvcnt));
3989  	i += scnprintf(buf + i, sz - i, " %u", sk->sk_shutdown);
3990  	i += scnprintf(buf + i, sz - i, " | %d", sk_wmem_alloc_get(sk));
3991  	i += scnprintf(buf + i, sz - i, " %d", sk->sk_sndbuf);
3992  	i += scnprintf(buf + i, sz - i, " | %d", sk_rmem_alloc_get(sk));
3993  	i += scnprintf(buf + i, sz - i, " %d", sk->sk_rcvbuf);
3994  	i += scnprintf(buf + i, sz - i, " | %d\n", READ_ONCE(sk->sk_backlog.len));
3995  
3996  	if (dqueues & TIPC_DUMP_SK_SNDQ) {
3997  		i += scnprintf(buf + i, sz - i, "sk_write_queue: ");
3998  		i += tipc_list_dump(&sk->sk_write_queue, false, buf + i);
3999  	}
4000  
4001  	if (dqueues & TIPC_DUMP_SK_RCVQ) {
4002  		i += scnprintf(buf + i, sz - i, "sk_receive_queue: ");
4003  		i += tipc_list_dump(&sk->sk_receive_queue, false, buf + i);
4004  	}
4005  
4006  	if (dqueues & TIPC_DUMP_SK_BKLGQ) {
4007  		i += scnprintf(buf + i, sz - i, "sk_backlog:\n  head ");
4008  		i += tipc_skb_dump(sk->sk_backlog.head, false, buf + i);
4009  		if (sk->sk_backlog.tail != sk->sk_backlog.head) {
4010  			i += scnprintf(buf + i, sz - i, "  tail ");
4011  			i += tipc_skb_dump(sk->sk_backlog.tail, false,
4012  					   buf + i);
4013  		}
4014  	}
4015  
4016  	return i;
4017  }
4018