• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2 /*
3  * Copyright (c) 2014-2017 Oracle.  All rights reserved.
4  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the BSD-type
10  * license below:
11  *
12  * Redistribution and use in source and binary forms, with or without
13  * modification, are permitted provided that the following conditions
14  * are met:
15  *
16  *      Redistributions of source code must retain the above copyright
17  *      notice, this list of conditions and the following disclaimer.
18  *
19  *      Redistributions in binary form must reproduce the above
20  *      copyright notice, this list of conditions and the following
21  *      disclaimer in the documentation and/or other materials provided
22  *      with the distribution.
23  *
24  *      Neither the name of the Network Appliance, Inc. nor the names of
25  *      its contributors may be used to endorse or promote products
26  *      derived from this software without specific prior written
27  *      permission.
28  *
29  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
36  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
37  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
38  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
39  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  */
41 
42 /*
43  * verbs.c
44  *
45  * Encapsulates the major functions managing:
46  *  o adapters
47  *  o endpoints
48  *  o connections
49  *  o buffer memory
50  */
51 
52 #include <linux/interrupt.h>
53 #include <linux/slab.h>
54 #include <linux/sunrpc/addr.h>
55 #include <linux/sunrpc/svc_rdma.h>
56 #include <linux/log2.h>
57 
58 #include <asm-generic/barrier.h>
59 #include <asm/bitops.h>
60 
61 #include <rdma/ib_cm.h>
62 
63 #include "xprt_rdma.h"
64 #include <trace/events/rpcrdma.h>
65 
66 /*
67  * Globals/Macros
68  */
69 
70 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
71 # define RPCDBG_FACILITY	RPCDBG_TRANS
72 #endif
73 
74 /*
75  * internal functions
76  */
77 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt);
78 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt);
79 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
80 				       struct rpcrdma_sendctx *sc);
81 static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt);
82 static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
83 static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep);
84 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt);
85 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
86 static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt);
87 static void rpcrdma_ep_get(struct rpcrdma_ep *ep);
88 static int rpcrdma_ep_put(struct rpcrdma_ep *ep);
89 static struct rpcrdma_regbuf *
90 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
91 		     gfp_t flags);
92 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
93 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
94 
95 /* Wait for outstanding transport work to finish. ib_drain_qp
96  * handles the drains in the wrong order for us, so open code
97  * them here.
98  */
rpcrdma_xprt_drain(struct rpcrdma_xprt * r_xprt)99 static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
100 {
101 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
102 	struct rdma_cm_id *id = ep->re_id;
103 
104 	/* Wait for rpcrdma_post_recvs() to leave its critical
105 	 * section.
106 	 */
107 	if (atomic_inc_return(&ep->re_receiving) > 1)
108 		wait_for_completion(&ep->re_done);
109 
110 	/* Flush Receives, then wait for deferred Reply work
111 	 * to complete.
112 	 */
113 	ib_drain_rq(id->qp);
114 
115 	/* Deferred Reply processing might have scheduled
116 	 * local invalidations.
117 	 */
118 	ib_drain_sq(id->qp);
119 
120 	rpcrdma_ep_put(ep);
121 }
122 
123 /* Ensure xprt_force_disconnect() is invoked exactly once when a
124  * connection is closed or lost. (The important thing is it needs
125  * to be invoked "at least" once).
126  */
rpcrdma_force_disconnect(struct rpcrdma_ep * ep)127 void rpcrdma_force_disconnect(struct rpcrdma_ep *ep)
128 {
129 	if (atomic_add_unless(&ep->re_force_disconnect, 1, 1))
130 		xprt_force_disconnect(ep->re_xprt);
131 }
132 
133 /**
134  * rpcrdma_flush_disconnect - Disconnect on flushed completion
135  * @r_xprt: transport to disconnect
136  * @wc: work completion entry
137  *
138  * Must be called in process context.
139  */
rpcrdma_flush_disconnect(struct rpcrdma_xprt * r_xprt,struct ib_wc * wc)140 void rpcrdma_flush_disconnect(struct rpcrdma_xprt *r_xprt, struct ib_wc *wc)
141 {
142 	if (wc->status != IB_WC_SUCCESS)
143 		rpcrdma_force_disconnect(r_xprt->rx_ep);
144 }
145 
146 /**
147  * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
148  * @cq:	completion queue
149  * @wc:	WCE for a completed Send WR
150  *
151  */
rpcrdma_wc_send(struct ib_cq * cq,struct ib_wc * wc)152 static void rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
153 {
154 	struct ib_cqe *cqe = wc->wr_cqe;
155 	struct rpcrdma_sendctx *sc =
156 		container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
157 	struct rpcrdma_xprt *r_xprt = cq->cq_context;
158 
159 	/* WARNING: Only wr_cqe and status are reliable at this point */
160 	trace_xprtrdma_wc_send(wc, &sc->sc_cid);
161 	rpcrdma_sendctx_put_locked(r_xprt, sc);
162 	rpcrdma_flush_disconnect(r_xprt, wc);
163 }
164 
165 /**
166  * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
167  * @cq:	completion queue
168  * @wc:	WCE for a completed Receive WR
169  *
170  */
rpcrdma_wc_receive(struct ib_cq * cq,struct ib_wc * wc)171 static void rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
172 {
173 	struct ib_cqe *cqe = wc->wr_cqe;
174 	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
175 					       rr_cqe);
176 	struct rpcrdma_xprt *r_xprt = cq->cq_context;
177 
178 	/* WARNING: Only wr_cqe and status are reliable at this point */
179 	trace_xprtrdma_wc_receive(wc, &rep->rr_cid);
180 	--r_xprt->rx_ep->re_receive_count;
181 	if (wc->status != IB_WC_SUCCESS)
182 		goto out_flushed;
183 
184 	/* status == SUCCESS means all fields in wc are trustworthy */
185 	rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
186 	rep->rr_wc_flags = wc->wc_flags;
187 	rep->rr_inv_rkey = wc->ex.invalidate_rkey;
188 
189 	ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
190 				   rdmab_addr(rep->rr_rdmabuf),
191 				   wc->byte_len, DMA_FROM_DEVICE);
192 
193 	rpcrdma_reply_handler(rep);
194 	return;
195 
196 out_flushed:
197 	rpcrdma_flush_disconnect(r_xprt, wc);
198 	rpcrdma_rep_put(&r_xprt->rx_buf, rep);
199 }
200 
rpcrdma_update_cm_private(struct rpcrdma_ep * ep,struct rdma_conn_param * param)201 static void rpcrdma_update_cm_private(struct rpcrdma_ep *ep,
202 				      struct rdma_conn_param *param)
203 {
204 	const struct rpcrdma_connect_private *pmsg = param->private_data;
205 	unsigned int rsize, wsize;
206 
207 	/* Default settings for RPC-over-RDMA Version One */
208 	ep->re_implicit_roundup = xprt_rdma_pad_optimize;
209 	rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
210 	wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
211 
212 	if (pmsg &&
213 	    pmsg->cp_magic == rpcrdma_cmp_magic &&
214 	    pmsg->cp_version == RPCRDMA_CMP_VERSION) {
215 		ep->re_implicit_roundup = true;
216 		rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
217 		wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
218 	}
219 
220 	if (rsize < ep->re_inline_recv)
221 		ep->re_inline_recv = rsize;
222 	if (wsize < ep->re_inline_send)
223 		ep->re_inline_send = wsize;
224 
225 	rpcrdma_set_max_header_sizes(ep);
226 }
227 
228 /**
229  * rpcrdma_cm_event_handler - Handle RDMA CM events
230  * @id: rdma_cm_id on which an event has occurred
231  * @event: details of the event
232  *
233  * Called with @id's mutex held. Returns 1 if caller should
234  * destroy @id, otherwise 0.
235  */
236 static int
rpcrdma_cm_event_handler(struct rdma_cm_id * id,struct rdma_cm_event * event)237 rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
238 {
239 	struct sockaddr *sap = (struct sockaddr *)&id->route.addr.dst_addr;
240 	struct rpcrdma_ep *ep = id->context;
241 
242 	might_sleep();
243 
244 	switch (event->event) {
245 	case RDMA_CM_EVENT_ADDR_RESOLVED:
246 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
247 		ep->re_async_rc = 0;
248 		complete(&ep->re_done);
249 		return 0;
250 	case RDMA_CM_EVENT_ADDR_ERROR:
251 		ep->re_async_rc = -EPROTO;
252 		complete(&ep->re_done);
253 		return 0;
254 	case RDMA_CM_EVENT_ROUTE_ERROR:
255 		ep->re_async_rc = -ENETUNREACH;
256 		complete(&ep->re_done);
257 		return 0;
258 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
259 		pr_info("rpcrdma: removing device %s for %pISpc\n",
260 			ep->re_id->device->name, sap);
261 		fallthrough;
262 	case RDMA_CM_EVENT_ADDR_CHANGE:
263 		ep->re_connect_status = -ENODEV;
264 		goto disconnected;
265 	case RDMA_CM_EVENT_ESTABLISHED:
266 		rpcrdma_ep_get(ep);
267 		ep->re_connect_status = 1;
268 		rpcrdma_update_cm_private(ep, &event->param.conn);
269 		trace_xprtrdma_inline_thresh(ep);
270 		wake_up_all(&ep->re_connect_wait);
271 		break;
272 	case RDMA_CM_EVENT_CONNECT_ERROR:
273 		ep->re_connect_status = -ENOTCONN;
274 		goto wake_connect_worker;
275 	case RDMA_CM_EVENT_UNREACHABLE:
276 		ep->re_connect_status = -ENETUNREACH;
277 		goto wake_connect_worker;
278 	case RDMA_CM_EVENT_REJECTED:
279 		dprintk("rpcrdma: connection to %pISpc rejected: %s\n",
280 			sap, rdma_reject_msg(id, event->status));
281 		ep->re_connect_status = -ECONNREFUSED;
282 		if (event->status == IB_CM_REJ_STALE_CONN)
283 			ep->re_connect_status = -ENOTCONN;
284 wake_connect_worker:
285 		wake_up_all(&ep->re_connect_wait);
286 		return 0;
287 	case RDMA_CM_EVENT_DISCONNECTED:
288 		ep->re_connect_status = -ECONNABORTED;
289 disconnected:
290 		rpcrdma_force_disconnect(ep);
291 		return rpcrdma_ep_put(ep);
292 	default:
293 		break;
294 	}
295 
296 	dprintk("RPC:       %s: %pISpc on %s/frwr: %s\n", __func__, sap,
297 		ep->re_id->device->name, rdma_event_msg(event->event));
298 	return 0;
299 }
300 
rpcrdma_create_id(struct rpcrdma_xprt * r_xprt,struct rpcrdma_ep * ep)301 static struct rdma_cm_id *rpcrdma_create_id(struct rpcrdma_xprt *r_xprt,
302 					    struct rpcrdma_ep *ep)
303 {
304 	unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
305 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
306 	struct rdma_cm_id *id;
307 	int rc;
308 
309 	init_completion(&ep->re_done);
310 
311 	id = rdma_create_id(xprt->xprt_net, rpcrdma_cm_event_handler, ep,
312 			    RDMA_PS_TCP, IB_QPT_RC);
313 	if (IS_ERR(id))
314 		return id;
315 
316 	ep->re_async_rc = -ETIMEDOUT;
317 	rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)&xprt->addr,
318 			       RDMA_RESOLVE_TIMEOUT);
319 	if (rc)
320 		goto out;
321 	rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
322 	if (rc < 0)
323 		goto out;
324 
325 	rc = ep->re_async_rc;
326 	if (rc)
327 		goto out;
328 
329 	ep->re_async_rc = -ETIMEDOUT;
330 	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
331 	if (rc)
332 		goto out;
333 	rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
334 	if (rc < 0)
335 		goto out;
336 	rc = ep->re_async_rc;
337 	if (rc)
338 		goto out;
339 
340 	return id;
341 
342 out:
343 	rdma_destroy_id(id);
344 	return ERR_PTR(rc);
345 }
346 
rpcrdma_ep_destroy(struct kref * kref)347 static void rpcrdma_ep_destroy(struct kref *kref)
348 {
349 	struct rpcrdma_ep *ep = container_of(kref, struct rpcrdma_ep, re_kref);
350 
351 	if (ep->re_id->qp) {
352 		rdma_destroy_qp(ep->re_id);
353 		ep->re_id->qp = NULL;
354 	}
355 
356 	if (ep->re_attr.recv_cq)
357 		ib_free_cq(ep->re_attr.recv_cq);
358 	ep->re_attr.recv_cq = NULL;
359 	if (ep->re_attr.send_cq)
360 		ib_free_cq(ep->re_attr.send_cq);
361 	ep->re_attr.send_cq = NULL;
362 
363 	if (ep->re_pd)
364 		ib_dealloc_pd(ep->re_pd);
365 	ep->re_pd = NULL;
366 
367 	kfree(ep);
368 	module_put(THIS_MODULE);
369 }
370 
rpcrdma_ep_get(struct rpcrdma_ep * ep)371 static noinline void rpcrdma_ep_get(struct rpcrdma_ep *ep)
372 {
373 	kref_get(&ep->re_kref);
374 }
375 
376 /* Returns:
377  *     %0 if @ep still has a positive kref count, or
378  *     %1 if @ep was destroyed successfully.
379  */
rpcrdma_ep_put(struct rpcrdma_ep * ep)380 static noinline int rpcrdma_ep_put(struct rpcrdma_ep *ep)
381 {
382 	return kref_put(&ep->re_kref, rpcrdma_ep_destroy);
383 }
384 
rpcrdma_ep_create(struct rpcrdma_xprt * r_xprt)385 static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
386 {
387 	struct rpcrdma_connect_private *pmsg;
388 	struct ib_device *device;
389 	struct rdma_cm_id *id;
390 	struct rpcrdma_ep *ep;
391 	int rc;
392 
393 	ep = kzalloc(sizeof(*ep), GFP_NOFS);
394 	if (!ep)
395 		return -ENOTCONN;
396 	ep->re_xprt = &r_xprt->rx_xprt;
397 	kref_init(&ep->re_kref);
398 
399 	id = rpcrdma_create_id(r_xprt, ep);
400 	if (IS_ERR(id)) {
401 		kfree(ep);
402 		return PTR_ERR(id);
403 	}
404 	__module_get(THIS_MODULE);
405 	device = id->device;
406 	ep->re_id = id;
407 	reinit_completion(&ep->re_done);
408 
409 	ep->re_max_requests = r_xprt->rx_xprt.max_reqs;
410 	ep->re_inline_send = xprt_rdma_max_inline_write;
411 	ep->re_inline_recv = xprt_rdma_max_inline_read;
412 	rc = frwr_query_device(ep, device);
413 	if (rc)
414 		goto out_destroy;
415 
416 	r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->re_max_requests);
417 
418 	ep->re_attr.srq = NULL;
419 	ep->re_attr.cap.max_inline_data = 0;
420 	ep->re_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
421 	ep->re_attr.qp_type = IB_QPT_RC;
422 	ep->re_attr.port_num = ~0;
423 
424 	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
425 		"iovs: send %d recv %d\n",
426 		__func__,
427 		ep->re_attr.cap.max_send_wr,
428 		ep->re_attr.cap.max_recv_wr,
429 		ep->re_attr.cap.max_send_sge,
430 		ep->re_attr.cap.max_recv_sge);
431 
432 	ep->re_send_batch = ep->re_max_requests >> 3;
433 	ep->re_send_count = ep->re_send_batch;
434 	init_waitqueue_head(&ep->re_connect_wait);
435 
436 	ep->re_attr.send_cq = ib_alloc_cq_any(device, r_xprt,
437 					      ep->re_attr.cap.max_send_wr,
438 					      IB_POLL_WORKQUEUE);
439 	if (IS_ERR(ep->re_attr.send_cq)) {
440 		rc = PTR_ERR(ep->re_attr.send_cq);
441 		ep->re_attr.send_cq = NULL;
442 		goto out_destroy;
443 	}
444 
445 	ep->re_attr.recv_cq = ib_alloc_cq_any(device, r_xprt,
446 					      ep->re_attr.cap.max_recv_wr,
447 					      IB_POLL_WORKQUEUE);
448 	if (IS_ERR(ep->re_attr.recv_cq)) {
449 		rc = PTR_ERR(ep->re_attr.recv_cq);
450 		ep->re_attr.recv_cq = NULL;
451 		goto out_destroy;
452 	}
453 	ep->re_receive_count = 0;
454 
455 	/* Initialize cma parameters */
456 	memset(&ep->re_remote_cma, 0, sizeof(ep->re_remote_cma));
457 
458 	/* Prepare RDMA-CM private message */
459 	pmsg = &ep->re_cm_private;
460 	pmsg->cp_magic = rpcrdma_cmp_magic;
461 	pmsg->cp_version = RPCRDMA_CMP_VERSION;
462 	pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
463 	pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->re_inline_send);
464 	pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->re_inline_recv);
465 	ep->re_remote_cma.private_data = pmsg;
466 	ep->re_remote_cma.private_data_len = sizeof(*pmsg);
467 
468 	/* Client offers RDMA Read but does not initiate */
469 	ep->re_remote_cma.initiator_depth = 0;
470 	ep->re_remote_cma.responder_resources =
471 		min_t(int, U8_MAX, device->attrs.max_qp_rd_atom);
472 
473 	/* Limit transport retries so client can detect server
474 	 * GID changes quickly. RPC layer handles re-establishing
475 	 * transport connection and retransmission.
476 	 */
477 	ep->re_remote_cma.retry_count = 6;
478 
479 	/* RPC-over-RDMA handles its own flow control. In addition,
480 	 * make all RNR NAKs visible so we know that RPC-over-RDMA
481 	 * flow control is working correctly (no NAKs should be seen).
482 	 */
483 	ep->re_remote_cma.flow_control = 0;
484 	ep->re_remote_cma.rnr_retry_count = 0;
485 
486 	ep->re_pd = ib_alloc_pd(device, 0);
487 	if (IS_ERR(ep->re_pd)) {
488 		rc = PTR_ERR(ep->re_pd);
489 		ep->re_pd = NULL;
490 		goto out_destroy;
491 	}
492 
493 	rc = rdma_create_qp(id, ep->re_pd, &ep->re_attr);
494 	if (rc)
495 		goto out_destroy;
496 
497 	r_xprt->rx_ep = ep;
498 	return 0;
499 
500 out_destroy:
501 	rpcrdma_ep_put(ep);
502 	rdma_destroy_id(id);
503 	return rc;
504 }
505 
506 /**
507  * rpcrdma_xprt_connect - Connect an unconnected transport
508  * @r_xprt: controlling transport instance
509  *
510  * Returns 0 on success or a negative errno.
511  */
rpcrdma_xprt_connect(struct rpcrdma_xprt * r_xprt)512 int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt)
513 {
514 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
515 	struct rpcrdma_ep *ep;
516 	int rc;
517 
518 	rc = rpcrdma_ep_create(r_xprt);
519 	if (rc)
520 		return rc;
521 	ep = r_xprt->rx_ep;
522 
523 	xprt_clear_connected(xprt);
524 	rpcrdma_reset_cwnd(r_xprt);
525 
526 	/* Bump the ep's reference count while there are
527 	 * outstanding Receives.
528 	 */
529 	rpcrdma_ep_get(ep);
530 	rpcrdma_post_recvs(r_xprt, 1, true);
531 
532 	rc = rdma_connect(ep->re_id, &ep->re_remote_cma);
533 	if (rc)
534 		goto out;
535 
536 	if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
537 		xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
538 	wait_event_interruptible(ep->re_connect_wait,
539 				 ep->re_connect_status != 0);
540 	if (ep->re_connect_status <= 0) {
541 		rc = ep->re_connect_status;
542 		goto out;
543 	}
544 
545 	rc = rpcrdma_sendctxs_create(r_xprt);
546 	if (rc) {
547 		rc = -ENOTCONN;
548 		goto out;
549 	}
550 
551 	rc = rpcrdma_reqs_setup(r_xprt);
552 	if (rc) {
553 		rc = -ENOTCONN;
554 		goto out;
555 	}
556 	rpcrdma_mrs_create(r_xprt);
557 
558 out:
559 	trace_xprtrdma_connect(r_xprt, rc);
560 	return rc;
561 }
562 
563 /**
564  * rpcrdma_xprt_disconnect - Disconnect underlying transport
565  * @r_xprt: controlling transport instance
566  *
567  * Caller serializes. Either the transport send lock is held,
568  * or we're being called to destroy the transport.
569  *
570  * On return, @r_xprt is completely divested of all hardware
571  * resources and prepared for the next ->connect operation.
572  */
rpcrdma_xprt_disconnect(struct rpcrdma_xprt * r_xprt)573 void rpcrdma_xprt_disconnect(struct rpcrdma_xprt *r_xprt)
574 {
575 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
576 	struct rdma_cm_id *id;
577 	int rc;
578 
579 	if (!ep)
580 		return;
581 
582 	id = ep->re_id;
583 	rc = rdma_disconnect(id);
584 	trace_xprtrdma_disconnect(r_xprt, rc);
585 
586 	rpcrdma_xprt_drain(r_xprt);
587 	rpcrdma_reps_unmap(r_xprt);
588 	rpcrdma_reqs_reset(r_xprt);
589 	rpcrdma_mrs_destroy(r_xprt);
590 	rpcrdma_sendctxs_destroy(r_xprt);
591 
592 	if (rpcrdma_ep_put(ep))
593 		rdma_destroy_id(id);
594 
595 	r_xprt->rx_ep = NULL;
596 }
597 
598 /* Fixed-size circular FIFO queue. This implementation is wait-free and
599  * lock-free.
600  *
601  * Consumer is the code path that posts Sends. This path dequeues a
602  * sendctx for use by a Send operation. Multiple consumer threads
603  * are serialized by the RPC transport lock, which allows only one
604  * ->send_request call at a time.
605  *
606  * Producer is the code path that handles Send completions. This path
607  * enqueues a sendctx that has been completed. Multiple producer
608  * threads are serialized by the ib_poll_cq() function.
609  */
610 
611 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
612  * queue activity, and rpcrdma_xprt_drain has flushed all remaining
613  * Send requests.
614  */
rpcrdma_sendctxs_destroy(struct rpcrdma_xprt * r_xprt)615 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt)
616 {
617 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
618 	unsigned long i;
619 
620 	if (!buf->rb_sc_ctxs)
621 		return;
622 	for (i = 0; i <= buf->rb_sc_last; i++)
623 		kfree(buf->rb_sc_ctxs[i]);
624 	kfree(buf->rb_sc_ctxs);
625 	buf->rb_sc_ctxs = NULL;
626 }
627 
rpcrdma_sendctx_create(struct rpcrdma_ep * ep)628 static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ep *ep)
629 {
630 	struct rpcrdma_sendctx *sc;
631 
632 	sc = kzalloc(struct_size(sc, sc_sges, ep->re_attr.cap.max_send_sge),
633 		     GFP_KERNEL);
634 	if (!sc)
635 		return NULL;
636 
637 	sc->sc_cqe.done = rpcrdma_wc_send;
638 	sc->sc_cid.ci_queue_id = ep->re_attr.send_cq->res.id;
639 	sc->sc_cid.ci_completion_id =
640 		atomic_inc_return(&ep->re_completion_ids);
641 	return sc;
642 }
643 
rpcrdma_sendctxs_create(struct rpcrdma_xprt * r_xprt)644 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
645 {
646 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
647 	struct rpcrdma_sendctx *sc;
648 	unsigned long i;
649 
650 	/* Maximum number of concurrent outstanding Send WRs. Capping
651 	 * the circular queue size stops Send Queue overflow by causing
652 	 * the ->send_request call to fail temporarily before too many
653 	 * Sends are posted.
654 	 */
655 	i = r_xprt->rx_ep->re_max_requests + RPCRDMA_MAX_BC_REQUESTS;
656 	buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
657 	if (!buf->rb_sc_ctxs)
658 		return -ENOMEM;
659 
660 	buf->rb_sc_last = i - 1;
661 	for (i = 0; i <= buf->rb_sc_last; i++) {
662 		sc = rpcrdma_sendctx_create(r_xprt->rx_ep);
663 		if (!sc)
664 			return -ENOMEM;
665 
666 		buf->rb_sc_ctxs[i] = sc;
667 	}
668 
669 	buf->rb_sc_head = 0;
670 	buf->rb_sc_tail = 0;
671 	return 0;
672 }
673 
674 /* The sendctx queue is not guaranteed to have a size that is a
675  * power of two, thus the helpers in circ_buf.h cannot be used.
676  * The other option is to use modulus (%), which can be expensive.
677  */
rpcrdma_sendctx_next(struct rpcrdma_buffer * buf,unsigned long item)678 static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
679 					  unsigned long item)
680 {
681 	return likely(item < buf->rb_sc_last) ? item + 1 : 0;
682 }
683 
684 /**
685  * rpcrdma_sendctx_get_locked - Acquire a send context
686  * @r_xprt: controlling transport instance
687  *
688  * Returns pointer to a free send completion context; or NULL if
689  * the queue is empty.
690  *
691  * Usage: Called to acquire an SGE array before preparing a Send WR.
692  *
693  * The caller serializes calls to this function (per transport), and
694  * provides an effective memory barrier that flushes the new value
695  * of rb_sc_head.
696  */
rpcrdma_sendctx_get_locked(struct rpcrdma_xprt * r_xprt)697 struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
698 {
699 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
700 	struct rpcrdma_sendctx *sc;
701 	unsigned long next_head;
702 
703 	next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
704 
705 	if (next_head == READ_ONCE(buf->rb_sc_tail))
706 		goto out_emptyq;
707 
708 	/* ORDER: item must be accessed _before_ head is updated */
709 	sc = buf->rb_sc_ctxs[next_head];
710 
711 	/* Releasing the lock in the caller acts as a memory
712 	 * barrier that flushes rb_sc_head.
713 	 */
714 	buf->rb_sc_head = next_head;
715 
716 	return sc;
717 
718 out_emptyq:
719 	/* The queue is "empty" if there have not been enough Send
720 	 * completions recently. This is a sign the Send Queue is
721 	 * backing up. Cause the caller to pause and try again.
722 	 */
723 	xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
724 	r_xprt->rx_stats.empty_sendctx_q++;
725 	return NULL;
726 }
727 
728 /**
729  * rpcrdma_sendctx_put_locked - Release a send context
730  * @r_xprt: controlling transport instance
731  * @sc: send context to release
732  *
733  * Usage: Called from Send completion to return a sendctxt
734  * to the queue.
735  *
736  * The caller serializes calls to this function (per transport).
737  */
rpcrdma_sendctx_put_locked(struct rpcrdma_xprt * r_xprt,struct rpcrdma_sendctx * sc)738 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
739 				       struct rpcrdma_sendctx *sc)
740 {
741 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
742 	unsigned long next_tail;
743 
744 	/* Unmap SGEs of previously completed but unsignaled
745 	 * Sends by walking up the queue until @sc is found.
746 	 */
747 	next_tail = buf->rb_sc_tail;
748 	do {
749 		next_tail = rpcrdma_sendctx_next(buf, next_tail);
750 
751 		/* ORDER: item must be accessed _before_ tail is updated */
752 		rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
753 
754 	} while (buf->rb_sc_ctxs[next_tail] != sc);
755 
756 	/* Paired with READ_ONCE */
757 	smp_store_release(&buf->rb_sc_tail, next_tail);
758 
759 	xprt_write_space(&r_xprt->rx_xprt);
760 }
761 
762 static void
rpcrdma_mrs_create(struct rpcrdma_xprt * r_xprt)763 rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
764 {
765 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
766 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
767 	unsigned int count;
768 
769 	for (count = 0; count < ep->re_max_rdma_segs; count++) {
770 		struct rpcrdma_mr *mr;
771 		int rc;
772 
773 		mr = kzalloc(sizeof(*mr), GFP_NOFS);
774 		if (!mr)
775 			break;
776 
777 		rc = frwr_mr_init(r_xprt, mr);
778 		if (rc) {
779 			kfree(mr);
780 			break;
781 		}
782 
783 		spin_lock(&buf->rb_lock);
784 		rpcrdma_mr_push(mr, &buf->rb_mrs);
785 		list_add(&mr->mr_all, &buf->rb_all_mrs);
786 		spin_unlock(&buf->rb_lock);
787 	}
788 
789 	r_xprt->rx_stats.mrs_allocated += count;
790 	trace_xprtrdma_createmrs(r_xprt, count);
791 }
792 
793 static void
rpcrdma_mr_refresh_worker(struct work_struct * work)794 rpcrdma_mr_refresh_worker(struct work_struct *work)
795 {
796 	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
797 						  rb_refresh_worker);
798 	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
799 						   rx_buf);
800 
801 	rpcrdma_mrs_create(r_xprt);
802 	xprt_write_space(&r_xprt->rx_xprt);
803 }
804 
805 /**
806  * rpcrdma_mrs_refresh - Wake the MR refresh worker
807  * @r_xprt: controlling transport instance
808  *
809  */
rpcrdma_mrs_refresh(struct rpcrdma_xprt * r_xprt)810 void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt)
811 {
812 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
813 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
814 
815 	/* If there is no underlying connection, it's no use
816 	 * to wake the refresh worker.
817 	 */
818 	if (ep->re_connect_status == 1) {
819 		/* The work is scheduled on a WQ_MEM_RECLAIM
820 		 * workqueue in order to prevent MR allocation
821 		 * from recursing into NFS during direct reclaim.
822 		 */
823 		queue_work(xprtiod_workqueue, &buf->rb_refresh_worker);
824 	}
825 }
826 
827 /**
828  * rpcrdma_req_create - Allocate an rpcrdma_req object
829  * @r_xprt: controlling r_xprt
830  * @size: initial size, in bytes, of send and receive buffers
831  * @flags: GFP flags passed to memory allocators
832  *
833  * Returns an allocated and fully initialized rpcrdma_req or NULL.
834  */
rpcrdma_req_create(struct rpcrdma_xprt * r_xprt,size_t size,gfp_t flags)835 struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
836 				       gfp_t flags)
837 {
838 	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
839 	struct rpcrdma_req *req;
840 
841 	req = kzalloc(sizeof(*req), flags);
842 	if (req == NULL)
843 		goto out1;
844 
845 	req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags);
846 	if (!req->rl_sendbuf)
847 		goto out2;
848 
849 	req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags);
850 	if (!req->rl_recvbuf)
851 		goto out3;
852 
853 	INIT_LIST_HEAD(&req->rl_free_mrs);
854 	INIT_LIST_HEAD(&req->rl_registered);
855 	spin_lock(&buffer->rb_lock);
856 	list_add(&req->rl_all, &buffer->rb_allreqs);
857 	spin_unlock(&buffer->rb_lock);
858 	return req;
859 
860 out3:
861 	rpcrdma_regbuf_free(req->rl_sendbuf);
862 out2:
863 	kfree(req);
864 out1:
865 	return NULL;
866 }
867 
868 /**
869  * rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object
870  * @r_xprt: controlling transport instance
871  * @req: rpcrdma_req object to set up
872  *
873  * Returns zero on success, and a negative errno on failure.
874  */
rpcrdma_req_setup(struct rpcrdma_xprt * r_xprt,struct rpcrdma_req * req)875 int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
876 {
877 	struct rpcrdma_regbuf *rb;
878 	size_t maxhdrsize;
879 
880 	/* Compute maximum header buffer size in bytes */
881 	maxhdrsize = rpcrdma_fixed_maxsz + 3 +
882 		     r_xprt->rx_ep->re_max_rdma_segs * rpcrdma_readchunk_maxsz;
883 	maxhdrsize *= sizeof(__be32);
884 	rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
885 				  DMA_TO_DEVICE, GFP_KERNEL);
886 	if (!rb)
887 		goto out;
888 
889 	if (!__rpcrdma_regbuf_dma_map(r_xprt, rb))
890 		goto out_free;
891 
892 	req->rl_rdmabuf = rb;
893 	xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
894 	return 0;
895 
896 out_free:
897 	rpcrdma_regbuf_free(rb);
898 out:
899 	return -ENOMEM;
900 }
901 
902 /* ASSUMPTION: the rb_allreqs list is stable for the duration,
903  * and thus can be walked without holding rb_lock. Eg. the
904  * caller is holding the transport send lock to exclude
905  * device removal or disconnection.
906  */
rpcrdma_reqs_setup(struct rpcrdma_xprt * r_xprt)907 static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt)
908 {
909 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
910 	struct rpcrdma_req *req;
911 	int rc;
912 
913 	list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
914 		rc = rpcrdma_req_setup(r_xprt, req);
915 		if (rc)
916 			return rc;
917 	}
918 	return 0;
919 }
920 
rpcrdma_req_reset(struct rpcrdma_req * req)921 static void rpcrdma_req_reset(struct rpcrdma_req *req)
922 {
923 	/* Credits are valid for only one connection */
924 	req->rl_slot.rq_cong = 0;
925 
926 	rpcrdma_regbuf_free(req->rl_rdmabuf);
927 	req->rl_rdmabuf = NULL;
928 
929 	rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
930 	rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
931 
932 	frwr_reset(req);
933 }
934 
935 /* ASSUMPTION: the rb_allreqs list is stable for the duration,
936  * and thus can be walked without holding rb_lock. Eg. the
937  * caller is holding the transport send lock to exclude
938  * device removal or disconnection.
939  */
rpcrdma_reqs_reset(struct rpcrdma_xprt * r_xprt)940 static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
941 {
942 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
943 	struct rpcrdma_req *req;
944 
945 	list_for_each_entry(req, &buf->rb_allreqs, rl_all)
946 		rpcrdma_req_reset(req);
947 }
948 
949 static noinline
rpcrdma_rep_create(struct rpcrdma_xprt * r_xprt,bool temp)950 struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
951 				       bool temp)
952 {
953 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
954 	struct rpcrdma_rep *rep;
955 
956 	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
957 	if (rep == NULL)
958 		goto out;
959 
960 	rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep->re_inline_recv,
961 					       DMA_FROM_DEVICE, GFP_KERNEL);
962 	if (!rep->rr_rdmabuf)
963 		goto out_free;
964 
965 	rep->rr_cid.ci_completion_id =
966 		atomic_inc_return(&r_xprt->rx_ep->re_completion_ids);
967 
968 	xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
969 		     rdmab_length(rep->rr_rdmabuf));
970 	rep->rr_cqe.done = rpcrdma_wc_receive;
971 	rep->rr_rxprt = r_xprt;
972 	rep->rr_recv_wr.next = NULL;
973 	rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
974 	rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
975 	rep->rr_recv_wr.num_sge = 1;
976 	rep->rr_temp = temp;
977 
978 	spin_lock(&buf->rb_lock);
979 	list_add(&rep->rr_all, &buf->rb_all_reps);
980 	spin_unlock(&buf->rb_lock);
981 	return rep;
982 
983 out_free:
984 	kfree(rep);
985 out:
986 	return NULL;
987 }
988 
rpcrdma_rep_free(struct rpcrdma_rep * rep)989 static void rpcrdma_rep_free(struct rpcrdma_rep *rep)
990 {
991 	rpcrdma_regbuf_free(rep->rr_rdmabuf);
992 	kfree(rep);
993 }
994 
rpcrdma_rep_destroy(struct rpcrdma_rep * rep)995 static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
996 {
997 	struct rpcrdma_buffer *buf = &rep->rr_rxprt->rx_buf;
998 
999 	spin_lock(&buf->rb_lock);
1000 	list_del(&rep->rr_all);
1001 	spin_unlock(&buf->rb_lock);
1002 
1003 	rpcrdma_rep_free(rep);
1004 }
1005 
rpcrdma_rep_get_locked(struct rpcrdma_buffer * buf)1006 static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
1007 {
1008 	struct llist_node *node;
1009 
1010 	/* Calls to llist_del_first are required to be serialized */
1011 	node = llist_del_first(&buf->rb_free_reps);
1012 	if (!node)
1013 		return NULL;
1014 	return llist_entry(node, struct rpcrdma_rep, rr_node);
1015 }
1016 
1017 /**
1018  * rpcrdma_rep_put - Release rpcrdma_rep back to free list
1019  * @buf: buffer pool
1020  * @rep: rep to release
1021  *
1022  */
rpcrdma_rep_put(struct rpcrdma_buffer * buf,struct rpcrdma_rep * rep)1023 void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep)
1024 {
1025 	llist_add(&rep->rr_node, &buf->rb_free_reps);
1026 }
1027 
1028 /* Caller must ensure the QP is quiescent (RQ is drained) before
1029  * invoking this function, to guarantee rb_all_reps is not
1030  * changing.
1031  */
rpcrdma_reps_unmap(struct rpcrdma_xprt * r_xprt)1032 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
1033 {
1034 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1035 	struct rpcrdma_rep *rep;
1036 
1037 	list_for_each_entry(rep, &buf->rb_all_reps, rr_all) {
1038 		rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
1039 		rep->rr_temp = true;	/* Mark this rep for destruction */
1040 	}
1041 }
1042 
rpcrdma_reps_destroy(struct rpcrdma_buffer * buf)1043 static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
1044 {
1045 	struct rpcrdma_rep *rep;
1046 
1047 	spin_lock(&buf->rb_lock);
1048 	while ((rep = list_first_entry_or_null(&buf->rb_all_reps,
1049 					       struct rpcrdma_rep,
1050 					       rr_all)) != NULL) {
1051 		list_del(&rep->rr_all);
1052 		spin_unlock(&buf->rb_lock);
1053 
1054 		rpcrdma_rep_free(rep);
1055 
1056 		spin_lock(&buf->rb_lock);
1057 	}
1058 	spin_unlock(&buf->rb_lock);
1059 }
1060 
1061 /**
1062  * rpcrdma_buffer_create - Create initial set of req/rep objects
1063  * @r_xprt: transport instance to (re)initialize
1064  *
1065  * Returns zero on success, otherwise a negative errno.
1066  */
rpcrdma_buffer_create(struct rpcrdma_xprt * r_xprt)1067 int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1068 {
1069 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1070 	int i, rc;
1071 
1072 	buf->rb_bc_srv_max_requests = 0;
1073 	spin_lock_init(&buf->rb_lock);
1074 	INIT_LIST_HEAD(&buf->rb_mrs);
1075 	INIT_LIST_HEAD(&buf->rb_all_mrs);
1076 	INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
1077 
1078 	INIT_LIST_HEAD(&buf->rb_send_bufs);
1079 	INIT_LIST_HEAD(&buf->rb_allreqs);
1080 	INIT_LIST_HEAD(&buf->rb_all_reps);
1081 
1082 	rc = -ENOMEM;
1083 	for (i = 0; i < r_xprt->rx_xprt.max_reqs; i++) {
1084 		struct rpcrdma_req *req;
1085 
1086 		req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2,
1087 					 GFP_KERNEL);
1088 		if (!req)
1089 			goto out;
1090 		list_add(&req->rl_list, &buf->rb_send_bufs);
1091 	}
1092 
1093 	init_llist_head(&buf->rb_free_reps);
1094 
1095 	return 0;
1096 out:
1097 	rpcrdma_buffer_destroy(buf);
1098 	return rc;
1099 }
1100 
1101 /**
1102  * rpcrdma_req_destroy - Destroy an rpcrdma_req object
1103  * @req: unused object to be destroyed
1104  *
1105  * Relies on caller holding the transport send lock to protect
1106  * removing req->rl_all from buf->rb_all_reqs safely.
1107  */
rpcrdma_req_destroy(struct rpcrdma_req * req)1108 void rpcrdma_req_destroy(struct rpcrdma_req *req)
1109 {
1110 	struct rpcrdma_mr *mr;
1111 
1112 	list_del(&req->rl_all);
1113 
1114 	while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
1115 		struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
1116 
1117 		spin_lock(&buf->rb_lock);
1118 		list_del(&mr->mr_all);
1119 		spin_unlock(&buf->rb_lock);
1120 
1121 		frwr_mr_release(mr);
1122 	}
1123 
1124 	rpcrdma_regbuf_free(req->rl_recvbuf);
1125 	rpcrdma_regbuf_free(req->rl_sendbuf);
1126 	rpcrdma_regbuf_free(req->rl_rdmabuf);
1127 	kfree(req);
1128 }
1129 
1130 /**
1131  * rpcrdma_mrs_destroy - Release all of a transport's MRs
1132  * @r_xprt: controlling transport instance
1133  *
1134  * Relies on caller holding the transport send lock to protect
1135  * removing mr->mr_list from req->rl_free_mrs safely.
1136  */
rpcrdma_mrs_destroy(struct rpcrdma_xprt * r_xprt)1137 static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
1138 {
1139 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1140 	struct rpcrdma_mr *mr;
1141 
1142 	cancel_work_sync(&buf->rb_refresh_worker);
1143 
1144 	spin_lock(&buf->rb_lock);
1145 	while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
1146 					      struct rpcrdma_mr,
1147 					      mr_all)) != NULL) {
1148 		list_del(&mr->mr_list);
1149 		list_del(&mr->mr_all);
1150 		spin_unlock(&buf->rb_lock);
1151 
1152 		frwr_mr_release(mr);
1153 
1154 		spin_lock(&buf->rb_lock);
1155 	}
1156 	spin_unlock(&buf->rb_lock);
1157 }
1158 
1159 /**
1160  * rpcrdma_buffer_destroy - Release all hw resources
1161  * @buf: root control block for resources
1162  *
1163  * ORDERING: relies on a prior rpcrdma_xprt_drain :
1164  * - No more Send or Receive completions can occur
1165  * - All MRs, reps, and reqs are returned to their free lists
1166  */
1167 void
rpcrdma_buffer_destroy(struct rpcrdma_buffer * buf)1168 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1169 {
1170 	rpcrdma_reps_destroy(buf);
1171 
1172 	while (!list_empty(&buf->rb_send_bufs)) {
1173 		struct rpcrdma_req *req;
1174 
1175 		req = list_first_entry(&buf->rb_send_bufs,
1176 				       struct rpcrdma_req, rl_list);
1177 		list_del(&req->rl_list);
1178 		rpcrdma_req_destroy(req);
1179 	}
1180 }
1181 
1182 /**
1183  * rpcrdma_mr_get - Allocate an rpcrdma_mr object
1184  * @r_xprt: controlling transport
1185  *
1186  * Returns an initialized rpcrdma_mr or NULL if no free
1187  * rpcrdma_mr objects are available.
1188  */
1189 struct rpcrdma_mr *
rpcrdma_mr_get(struct rpcrdma_xprt * r_xprt)1190 rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
1191 {
1192 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1193 	struct rpcrdma_mr *mr;
1194 
1195 	spin_lock(&buf->rb_lock);
1196 	mr = rpcrdma_mr_pop(&buf->rb_mrs);
1197 	spin_unlock(&buf->rb_lock);
1198 	return mr;
1199 }
1200 
1201 /**
1202  * rpcrdma_reply_put - Put reply buffers back into pool
1203  * @buffers: buffer pool
1204  * @req: object to return
1205  *
1206  */
rpcrdma_reply_put(struct rpcrdma_buffer * buffers,struct rpcrdma_req * req)1207 void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
1208 {
1209 	if (req->rl_reply) {
1210 		rpcrdma_rep_put(buffers, req->rl_reply);
1211 		req->rl_reply = NULL;
1212 	}
1213 }
1214 
1215 /**
1216  * rpcrdma_buffer_get - Get a request buffer
1217  * @buffers: Buffer pool from which to obtain a buffer
1218  *
1219  * Returns a fresh rpcrdma_req, or NULL if none are available.
1220  */
1221 struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer * buffers)1222 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1223 {
1224 	struct rpcrdma_req *req;
1225 
1226 	spin_lock(&buffers->rb_lock);
1227 	req = list_first_entry_or_null(&buffers->rb_send_bufs,
1228 				       struct rpcrdma_req, rl_list);
1229 	if (req)
1230 		list_del_init(&req->rl_list);
1231 	spin_unlock(&buffers->rb_lock);
1232 	return req;
1233 }
1234 
1235 /**
1236  * rpcrdma_buffer_put - Put request/reply buffers back into pool
1237  * @buffers: buffer pool
1238  * @req: object to return
1239  *
1240  */
rpcrdma_buffer_put(struct rpcrdma_buffer * buffers,struct rpcrdma_req * req)1241 void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
1242 {
1243 	rpcrdma_reply_put(buffers, req);
1244 
1245 	spin_lock(&buffers->rb_lock);
1246 	list_add(&req->rl_list, &buffers->rb_send_bufs);
1247 	spin_unlock(&buffers->rb_lock);
1248 }
1249 
1250 /* Returns a pointer to a rpcrdma_regbuf object, or NULL.
1251  *
1252  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1253  * receiving the payload of RDMA RECV operations. During Long Calls
1254  * or Replies they may be registered externally via frwr_map.
1255  */
1256 static struct rpcrdma_regbuf *
rpcrdma_regbuf_alloc(size_t size,enum dma_data_direction direction,gfp_t flags)1257 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
1258 		     gfp_t flags)
1259 {
1260 	struct rpcrdma_regbuf *rb;
1261 
1262 	rb = kmalloc(sizeof(*rb), flags);
1263 	if (!rb)
1264 		return NULL;
1265 	rb->rg_data = kmalloc(size, flags);
1266 	if (!rb->rg_data) {
1267 		kfree(rb);
1268 		return NULL;
1269 	}
1270 
1271 	rb->rg_device = NULL;
1272 	rb->rg_direction = direction;
1273 	rb->rg_iov.length = size;
1274 	return rb;
1275 }
1276 
1277 /**
1278  * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
1279  * @rb: regbuf to reallocate
1280  * @size: size of buffer to be allocated, in bytes
1281  * @flags: GFP flags
1282  *
1283  * Returns true if reallocation was successful. If false is
1284  * returned, @rb is left untouched.
1285  */
rpcrdma_regbuf_realloc(struct rpcrdma_regbuf * rb,size_t size,gfp_t flags)1286 bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
1287 {
1288 	void *buf;
1289 
1290 	buf = kmalloc(size, flags);
1291 	if (!buf)
1292 		return false;
1293 
1294 	rpcrdma_regbuf_dma_unmap(rb);
1295 	kfree(rb->rg_data);
1296 
1297 	rb->rg_data = buf;
1298 	rb->rg_iov.length = size;
1299 	return true;
1300 }
1301 
1302 /**
1303  * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
1304  * @r_xprt: controlling transport instance
1305  * @rb: regbuf to be mapped
1306  *
1307  * Returns true if the buffer is now DMA mapped to @r_xprt's device
1308  */
__rpcrdma_regbuf_dma_map(struct rpcrdma_xprt * r_xprt,struct rpcrdma_regbuf * rb)1309 bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
1310 			      struct rpcrdma_regbuf *rb)
1311 {
1312 	struct ib_device *device = r_xprt->rx_ep->re_id->device;
1313 
1314 	if (rb->rg_direction == DMA_NONE)
1315 		return false;
1316 
1317 	rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
1318 					    rdmab_length(rb), rb->rg_direction);
1319 	if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
1320 		trace_xprtrdma_dma_maperr(rdmab_addr(rb));
1321 		return false;
1322 	}
1323 
1324 	rb->rg_device = device;
1325 	rb->rg_iov.lkey = r_xprt->rx_ep->re_pd->local_dma_lkey;
1326 	return true;
1327 }
1328 
rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf * rb)1329 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
1330 {
1331 	if (!rb)
1332 		return;
1333 
1334 	if (!rpcrdma_regbuf_is_mapped(rb))
1335 		return;
1336 
1337 	ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
1338 			    rb->rg_direction);
1339 	rb->rg_device = NULL;
1340 }
1341 
rpcrdma_regbuf_free(struct rpcrdma_regbuf * rb)1342 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
1343 {
1344 	rpcrdma_regbuf_dma_unmap(rb);
1345 	if (rb)
1346 		kfree(rb->rg_data);
1347 	kfree(rb);
1348 }
1349 
1350 /**
1351  * rpcrdma_post_recvs - Refill the Receive Queue
1352  * @r_xprt: controlling transport instance
1353  * @needed: current credit grant
1354  * @temp: mark Receive buffers to be deleted after one use
1355  *
1356  */
rpcrdma_post_recvs(struct rpcrdma_xprt * r_xprt,int needed,bool temp)1357 void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed, bool temp)
1358 {
1359 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1360 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
1361 	struct ib_recv_wr *wr, *bad_wr;
1362 	struct rpcrdma_rep *rep;
1363 	int count, rc;
1364 
1365 	rc = 0;
1366 	count = 0;
1367 
1368 	if (likely(ep->re_receive_count > needed))
1369 		goto out;
1370 	needed -= ep->re_receive_count;
1371 	if (!temp)
1372 		needed += RPCRDMA_MAX_RECV_BATCH;
1373 
1374 	if (atomic_inc_return(&ep->re_receiving) > 1)
1375 		goto out;
1376 
1377 	/* fast path: all needed reps can be found on the free list */
1378 	wr = NULL;
1379 	while (needed) {
1380 		rep = rpcrdma_rep_get_locked(buf);
1381 		if (rep && rep->rr_temp) {
1382 			rpcrdma_rep_destroy(rep);
1383 			continue;
1384 		}
1385 		if (!rep)
1386 			rep = rpcrdma_rep_create(r_xprt, temp);
1387 		if (!rep)
1388 			break;
1389 		if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf)) {
1390 			rpcrdma_rep_put(buf, rep);
1391 			break;
1392 		}
1393 
1394 		rep->rr_cid.ci_queue_id = ep->re_attr.recv_cq->res.id;
1395 		trace_xprtrdma_post_recv(rep);
1396 		rep->rr_recv_wr.next = wr;
1397 		wr = &rep->rr_recv_wr;
1398 		--needed;
1399 		++count;
1400 	}
1401 	if (!wr)
1402 		goto out;
1403 
1404 	rc = ib_post_recv(ep->re_id->qp, wr,
1405 			  (const struct ib_recv_wr **)&bad_wr);
1406 	if (rc) {
1407 		trace_xprtrdma_post_recvs_err(r_xprt, rc);
1408 		for (wr = bad_wr; wr;) {
1409 			struct rpcrdma_rep *rep;
1410 
1411 			rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
1412 			wr = wr->next;
1413 			rpcrdma_rep_put(buf, rep);
1414 			--count;
1415 		}
1416 	}
1417 	if (atomic_dec_return(&ep->re_receiving) > 0)
1418 		complete(&ep->re_done);
1419 
1420 out:
1421 	trace_xprtrdma_post_recvs(r_xprt, count);
1422 	ep->re_receive_count += count;
1423 	return;
1424 }
1425