• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* RxRPC recvmsg() implementation
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11 
12 #include <linux/net.h>
13 #include <linux/skbuff.h>
14 #include <net/sock.h>
15 #include <net/af_rxrpc.h>
16 #include "ar-internal.h"
17 
18 /*
19  * removal a call's user ID from the socket tree to make the user ID available
20  * again and so that it won't be seen again in association with that call
21  */
rxrpc_remove_user_ID(struct rxrpc_sock * rx,struct rxrpc_call * call)22 void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call)
23 {
24 	_debug("RELEASE CALL %d", call->debug_id);
25 
26 	if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
27 		write_lock_bh(&rx->call_lock);
28 		rb_erase(&call->sock_node, &call->socket->calls);
29 		clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
30 		write_unlock_bh(&rx->call_lock);
31 	}
32 
33 	read_lock_bh(&call->state_lock);
34 	if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
35 	    !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
36 		rxrpc_queue_call(call);
37 	read_unlock_bh(&call->state_lock);
38 }
39 
40 /*
41  * receive a message from an RxRPC socket
42  * - we need to be careful about two or more threads calling recvmsg
43  *   simultaneously
44  */
rxrpc_recvmsg(struct kiocb * iocb,struct socket * sock,struct msghdr * msg,size_t len,int flags)45 int rxrpc_recvmsg(struct kiocb *iocb, struct socket *sock,
46 		  struct msghdr *msg, size_t len, int flags)
47 {
48 	struct rxrpc_skb_priv *sp;
49 	struct rxrpc_call *call = NULL, *continue_call = NULL;
50 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
51 	struct sk_buff *skb;
52 	long timeo;
53 	int copy, ret, ullen, offset, copied = 0;
54 	u32 abort_code;
55 
56 	DEFINE_WAIT(wait);
57 
58 	_enter(",,,%zu,%d", len, flags);
59 
60 	if (flags & (MSG_OOB | MSG_TRUNC))
61 		return -EOPNOTSUPP;
62 
63 	ullen = msg->msg_flags & MSG_CMSG_COMPAT ? 4 : sizeof(unsigned long);
64 
65 	timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
66 	msg->msg_flags |= MSG_MORE;
67 
68 	lock_sock(&rx->sk);
69 
70 	for (;;) {
71 		/* return immediately if a client socket has no outstanding
72 		 * calls */
73 		if (RB_EMPTY_ROOT(&rx->calls)) {
74 			if (copied)
75 				goto out;
76 			if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
77 				release_sock(&rx->sk);
78 				if (continue_call)
79 					rxrpc_put_call(continue_call);
80 				return -ENODATA;
81 			}
82 		}
83 
84 		/* get the next message on the Rx queue */
85 		skb = skb_peek(&rx->sk.sk_receive_queue);
86 		if (!skb) {
87 			/* nothing remains on the queue */
88 			if (copied &&
89 			    (msg->msg_flags & MSG_PEEK || timeo == 0))
90 				goto out;
91 
92 			/* wait for a message to turn up */
93 			release_sock(&rx->sk);
94 			prepare_to_wait_exclusive(rx->sk.sk_sleep, &wait,
95 						  TASK_INTERRUPTIBLE);
96 			ret = sock_error(&rx->sk);
97 			if (ret)
98 				goto wait_error;
99 
100 			if (skb_queue_empty(&rx->sk.sk_receive_queue)) {
101 				if (signal_pending(current))
102 					goto wait_interrupted;
103 				timeo = schedule_timeout(timeo);
104 			}
105 			finish_wait(rx->sk.sk_sleep, &wait);
106 			lock_sock(&rx->sk);
107 			continue;
108 		}
109 
110 	peek_next_packet:
111 		sp = rxrpc_skb(skb);
112 		call = sp->call;
113 		ASSERT(call != NULL);
114 
115 		_debug("next pkt %s", rxrpc_pkts[sp->hdr.type]);
116 
117 		/* make sure we wait for the state to be updated in this call */
118 		spin_lock_bh(&call->lock);
119 		spin_unlock_bh(&call->lock);
120 
121 		if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
122 			_debug("packet from released call");
123 			if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
124 				BUG();
125 			rxrpc_free_skb(skb);
126 			continue;
127 		}
128 
129 		/* determine whether to continue last data receive */
130 		if (continue_call) {
131 			_debug("maybe cont");
132 			if (call != continue_call ||
133 			    skb->mark != RXRPC_SKB_MARK_DATA) {
134 				release_sock(&rx->sk);
135 				rxrpc_put_call(continue_call);
136 				_leave(" = %d [noncont]", copied);
137 				return copied;
138 			}
139 		}
140 
141 		rxrpc_get_call(call);
142 
143 		/* copy the peer address and timestamp */
144 		if (!continue_call) {
145 			if (msg->msg_name && msg->msg_namelen > 0)
146 				memcpy(msg->msg_name,
147 				       &call->conn->trans->peer->srx,
148 				       sizeof(call->conn->trans->peer->srx));
149 			sock_recv_timestamp(msg, &rx->sk, skb);
150 		}
151 
152 		/* receive the message */
153 		if (skb->mark != RXRPC_SKB_MARK_DATA)
154 			goto receive_non_data_message;
155 
156 		_debug("recvmsg DATA #%u { %d, %d }",
157 		       ntohl(sp->hdr.seq), skb->len, sp->offset);
158 
159 		if (!continue_call) {
160 			/* only set the control data once per recvmsg() */
161 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
162 				       ullen, &call->user_call_ID);
163 			if (ret < 0)
164 				goto copy_error;
165 			ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
166 		}
167 
168 		ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv);
169 		ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1);
170 		call->rx_data_recv = ntohl(sp->hdr.seq);
171 
172 		ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten);
173 
174 		offset = sp->offset;
175 		copy = skb->len - offset;
176 		if (copy > len - copied)
177 			copy = len - copied;
178 
179 		if (skb->ip_summed == CHECKSUM_UNNECESSARY) {
180 			ret = skb_copy_datagram_iovec(skb, offset,
181 						      msg->msg_iov, copy);
182 		} else {
183 			ret = skb_copy_and_csum_datagram_iovec(skb, offset,
184 							       msg->msg_iov);
185 			if (ret == -EINVAL)
186 				goto csum_copy_error;
187 		}
188 
189 		if (ret < 0)
190 			goto copy_error;
191 
192 		/* handle piecemeal consumption of data packets */
193 		_debug("copied %d+%d", copy, copied);
194 
195 		offset += copy;
196 		copied += copy;
197 
198 		if (!(flags & MSG_PEEK))
199 			sp->offset = offset;
200 
201 		if (sp->offset < skb->len) {
202 			_debug("buffer full");
203 			ASSERTCMP(copied, ==, len);
204 			break;
205 		}
206 
207 		/* we transferred the whole data packet */
208 		if (sp->hdr.flags & RXRPC_LAST_PACKET) {
209 			_debug("last");
210 			if (call->conn->out_clientflag) {
211 				 /* last byte of reply received */
212 				ret = copied;
213 				goto terminal_message;
214 			}
215 
216 			/* last bit of request received */
217 			if (!(flags & MSG_PEEK)) {
218 				_debug("eat packet");
219 				if (skb_dequeue(&rx->sk.sk_receive_queue) !=
220 				    skb)
221 					BUG();
222 				rxrpc_free_skb(skb);
223 			}
224 			msg->msg_flags &= ~MSG_MORE;
225 			break;
226 		}
227 
228 		/* move on to the next data message */
229 		_debug("next");
230 		if (!continue_call)
231 			continue_call = sp->call;
232 		else
233 			rxrpc_put_call(call);
234 		call = NULL;
235 
236 		if (flags & MSG_PEEK) {
237 			_debug("peek next");
238 			skb = skb->next;
239 			if (skb == (struct sk_buff *) &rx->sk.sk_receive_queue)
240 				break;
241 			goto peek_next_packet;
242 		}
243 
244 		_debug("eat packet");
245 		if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
246 			BUG();
247 		rxrpc_free_skb(skb);
248 	}
249 
250 	/* end of non-terminal data packet reception for the moment */
251 	_debug("end rcv data");
252 out:
253 	release_sock(&rx->sk);
254 	if (call)
255 		rxrpc_put_call(call);
256 	if (continue_call)
257 		rxrpc_put_call(continue_call);
258 	_leave(" = %d [data]", copied);
259 	return copied;
260 
261 	/* handle non-DATA messages such as aborts, incoming connections and
262 	 * final ACKs */
263 receive_non_data_message:
264 	_debug("non-data");
265 
266 	if (skb->mark == RXRPC_SKB_MARK_NEW_CALL) {
267 		_debug("RECV NEW CALL");
268 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &abort_code);
269 		if (ret < 0)
270 			goto copy_error;
271 		if (!(flags & MSG_PEEK)) {
272 			if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
273 				BUG();
274 			rxrpc_free_skb(skb);
275 		}
276 		goto out;
277 	}
278 
279 	ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
280 		       ullen, &call->user_call_ID);
281 	if (ret < 0)
282 		goto copy_error;
283 	ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
284 
285 	switch (skb->mark) {
286 	case RXRPC_SKB_MARK_DATA:
287 		BUG();
288 	case RXRPC_SKB_MARK_FINAL_ACK:
289 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &abort_code);
290 		break;
291 	case RXRPC_SKB_MARK_BUSY:
292 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code);
293 		break;
294 	case RXRPC_SKB_MARK_REMOTE_ABORT:
295 		abort_code = call->abort_code;
296 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code);
297 		break;
298 	case RXRPC_SKB_MARK_NET_ERROR:
299 		_debug("RECV NET ERROR %d", sp->error);
300 		abort_code = sp->error;
301 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &abort_code);
302 		break;
303 	case RXRPC_SKB_MARK_LOCAL_ERROR:
304 		_debug("RECV LOCAL ERROR %d", sp->error);
305 		abort_code = sp->error;
306 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4,
307 			       &abort_code);
308 		break;
309 	default:
310 		BUG();
311 		break;
312 	}
313 
314 	if (ret < 0)
315 		goto copy_error;
316 
317 terminal_message:
318 	_debug("terminal");
319 	msg->msg_flags &= ~MSG_MORE;
320 	msg->msg_flags |= MSG_EOR;
321 
322 	if (!(flags & MSG_PEEK)) {
323 		_net("free terminal skb %p", skb);
324 		if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
325 			BUG();
326 		rxrpc_free_skb(skb);
327 		rxrpc_remove_user_ID(rx, call);
328 	}
329 
330 	release_sock(&rx->sk);
331 	rxrpc_put_call(call);
332 	if (continue_call)
333 		rxrpc_put_call(continue_call);
334 	_leave(" = %d", ret);
335 	return ret;
336 
337 copy_error:
338 	_debug("copy error");
339 	release_sock(&rx->sk);
340 	rxrpc_put_call(call);
341 	if (continue_call)
342 		rxrpc_put_call(continue_call);
343 	_leave(" = %d", ret);
344 	return ret;
345 
346 csum_copy_error:
347 	_debug("csum error");
348 	release_sock(&rx->sk);
349 	if (continue_call)
350 		rxrpc_put_call(continue_call);
351 	rxrpc_kill_skb(skb);
352 	skb_kill_datagram(&rx->sk, skb, flags);
353 	rxrpc_put_call(call);
354 	return -EAGAIN;
355 
356 wait_interrupted:
357 	ret = sock_intr_errno(timeo);
358 wait_error:
359 	finish_wait(rx->sk.sk_sleep, &wait);
360 	if (continue_call)
361 		rxrpc_put_call(continue_call);
362 	if (copied)
363 		copied = ret;
364 	_leave(" = %d [waitfail %d]", copied, ret);
365 	return copied;
366 
367 }
368 
369 /**
370  * rxrpc_kernel_data_delivered - Record delivery of data message
371  * @skb: Message holding data
372  *
373  * Record the delivery of a data message.  This permits RxRPC to keep its
374  * tracking correct.  The socket buffer will be deleted.
375  */
rxrpc_kernel_data_delivered(struct sk_buff * skb)376 void rxrpc_kernel_data_delivered(struct sk_buff *skb)
377 {
378 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
379 	struct rxrpc_call *call = sp->call;
380 
381 	ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv);
382 	ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1);
383 	call->rx_data_recv = ntohl(sp->hdr.seq);
384 
385 	ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten);
386 	rxrpc_free_skb(skb);
387 }
388 
389 EXPORT_SYMBOL(rxrpc_kernel_data_delivered);
390 
391 /**
392  * rxrpc_kernel_is_data_last - Determine if data message is last one
393  * @skb: Message holding data
394  *
395  * Determine if data message is last one for the parent call.
396  */
rxrpc_kernel_is_data_last(struct sk_buff * skb)397 bool rxrpc_kernel_is_data_last(struct sk_buff *skb)
398 {
399 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
400 
401 	ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA);
402 
403 	return sp->hdr.flags & RXRPC_LAST_PACKET;
404 }
405 
406 EXPORT_SYMBOL(rxrpc_kernel_is_data_last);
407 
408 /**
409  * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message
410  * @skb: Message indicating an abort
411  *
412  * Get the abort code from an RxRPC abort message.
413  */
rxrpc_kernel_get_abort_code(struct sk_buff * skb)414 u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb)
415 {
416 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
417 
418 	ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_REMOTE_ABORT);
419 
420 	return sp->call->abort_code;
421 }
422 
423 EXPORT_SYMBOL(rxrpc_kernel_get_abort_code);
424 
425 /**
426  * rxrpc_kernel_get_error - Get the error number from an RxRPC error message
427  * @skb: Message indicating an error
428  *
429  * Get the error number from an RxRPC error message.
430  */
rxrpc_kernel_get_error_number(struct sk_buff * skb)431 int rxrpc_kernel_get_error_number(struct sk_buff *skb)
432 {
433 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
434 
435 	return sp->error;
436 }
437 
438 EXPORT_SYMBOL(rxrpc_kernel_get_error_number);
439