Lines Matching +full:rpc +full:- +full:if
1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
3 * Copyright (c) 2014-2017 Oracle. All rights reserved.
4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
9 * COPYING in the main directory of this source tree, or the BSD-type
39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45 * This file contains the guts of the RPC RDMA protocol, and
47 * to the Linux RPC framework lives.
57 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
61 /* Returns size of largest RPC-over-RDMA header in a Call message
63 * The largest Call header contains a full-size Read list and a
82 dprintk("RPC: %s: max call header size = %u\n", in rpcrdma_max_call_header_size()
87 /* Returns size of largest RPC-over-RDMA header in a Reply message
105 dprintk("RPC: %s: max reply header size = %u\n", in rpcrdma_max_reply_header_size()
112 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; in rpcrdma_set_max_header_sizes()
113 struct rpcrdma_ia *ia = &r_xprt->rx_ia; in rpcrdma_set_max_header_sizes()
114 unsigned int maxsegs = ia->ri_max_segs; in rpcrdma_set_max_header_sizes()
116 ia->ri_max_inline_write = cdata->inline_wsize - in rpcrdma_set_max_header_sizes()
118 ia->ri_max_inline_read = cdata->inline_rsize - in rpcrdma_set_max_header_sizes()
123 * plus the RPC call fit under the transport's inline limit. If the
127 * A Read chunk is also required if sending the RPC call inline would
133 struct xdr_buf *xdr = &rqst->rq_snd_buf; in rpcrdma_args_inline()
136 if (xdr->len > r_xprt->rx_ia.ri_max_inline_write) in rpcrdma_args_inline()
139 if (xdr->page_len) { in rpcrdma_args_inline()
140 remaining = xdr->page_len; in rpcrdma_args_inline()
141 offset = offset_in_page(xdr->page_base); in rpcrdma_args_inline()
144 remaining -= min_t(unsigned int, in rpcrdma_args_inline()
145 PAGE_SIZE - offset, remaining); in rpcrdma_args_inline()
147 if (++count > r_xprt->rx_ia.ri_max_send_sges) in rpcrdma_args_inline()
157 * operation. If the maximum combined reply message size exceeds that
164 struct rpcrdma_ia *ia = &r_xprt->rx_ia; in rpcrdma_results_inline()
166 return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read; in rpcrdma_results_inline()
183 base = vec->iov_base; in rpcrdma_convert_kvec()
185 remaining = vec->iov_len; in rpcrdma_convert_kvec()
187 seg->mr_page = NULL; in rpcrdma_convert_kvec()
188 seg->mr_offset = base; in rpcrdma_convert_kvec()
189 seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); in rpcrdma_convert_kvec()
190 remaining -= seg->mr_len; in rpcrdma_convert_kvec()
191 base += seg->mr_len; in rpcrdma_convert_kvec()
216 if (pos == 0) in rpcrdma_convert_iovs()
217 seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n); in rpcrdma_convert_iovs()
219 len = xdrbuf->page_len; in rpcrdma_convert_iovs()
220 ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); in rpcrdma_convert_iovs()
221 page_base = offset_in_page(xdrbuf->page_base); in rpcrdma_convert_iovs()
223 if (unlikely(!*ppages)) { in rpcrdma_convert_iovs()
228 if (!*ppages) in rpcrdma_convert_iovs()
229 return -ENOBUFS; in rpcrdma_convert_iovs()
231 seg->mr_page = *ppages; in rpcrdma_convert_iovs()
232 seg->mr_offset = (char *)page_base; in rpcrdma_convert_iovs()
233 seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len); in rpcrdma_convert_iovs()
234 len -= seg->mr_len; in rpcrdma_convert_iovs()
244 if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup) in rpcrdma_convert_iovs()
248 * extra segment for non-XDR-aligned Write chunks. The upper in rpcrdma_convert_iovs()
252 if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup) in rpcrdma_convert_iovs()
255 if (xdrbuf->tail[0].iov_len) in rpcrdma_convert_iovs()
256 seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n); in rpcrdma_convert_iovs()
259 if (unlikely(n > RPCRDMA_MAX_SEGS)) in rpcrdma_convert_iovs()
260 return -EIO; in rpcrdma_convert_iovs()
270 if (unlikely(!p)) in encode_item_present()
271 return -EMSGSIZE; in encode_item_present()
283 if (unlikely(!p)) in encode_item_not_present()
284 return -EMSGSIZE; in encode_item_not_present()
293 *iptr++ = cpu_to_be32(mr->mr_handle); in xdr_encode_rdma_segment()
294 *iptr++ = cpu_to_be32(mr->mr_length); in xdr_encode_rdma_segment()
295 xdr_encode_hyper(iptr, mr->mr_offset); in xdr_encode_rdma_segment()
304 if (unlikely(!p)) in encode_rdma_segment()
305 return -EMSGSIZE; in encode_rdma_segment()
318 if (unlikely(!p)) in encode_read_segment()
319 return -EMSGSIZE; in encode_read_segment()
330 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
334 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
336 * Returns zero on success, or a negative errno if a failure occurred.
345 struct xdr_stream *xdr = &req->rl_stream; in rpcrdma_encode_read_list()
351 pos = rqst->rq_snd_buf.head[0].iov_len; in rpcrdma_encode_read_list()
352 if (rtype == rpcrdma_areadch) in rpcrdma_encode_read_list()
354 seg = req->rl_segments; in rpcrdma_encode_read_list()
355 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos, in rpcrdma_encode_read_list()
357 if (nsegs < 0) in rpcrdma_encode_read_list()
361 seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, in rpcrdma_encode_read_list()
363 if (IS_ERR(seg)) in rpcrdma_encode_read_list()
365 rpcrdma_mr_push(mr, &req->rl_registered); in rpcrdma_encode_read_list()
367 if (encode_read_segment(xdr, mr, pos) < 0) in rpcrdma_encode_read_list()
368 return -EMSGSIZE; in rpcrdma_encode_read_list()
370 trace_xprtrdma_read_chunk(rqst->rq_task, pos, mr, nsegs); in rpcrdma_encode_read_list()
371 r_xprt->rx_stats.read_chunk_count++; in rpcrdma_encode_read_list()
372 nsegs -= mr->mr_nents; in rpcrdma_encode_read_list()
382 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
386 * 1 - N - HLOO - HLOO - ... - HLOO - 0
388 * Returns zero on success, or a negative errno if a failure occurred.
397 struct xdr_stream *xdr = &req->rl_stream; in rpcrdma_encode_write_list()
403 seg = req->rl_segments; in rpcrdma_encode_write_list()
404 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, in rpcrdma_encode_write_list()
405 rqst->rq_rcv_buf.head[0].iov_len, in rpcrdma_encode_write_list()
407 if (nsegs < 0) in rpcrdma_encode_write_list()
410 if (encode_item_present(xdr) < 0) in rpcrdma_encode_write_list()
411 return -EMSGSIZE; in rpcrdma_encode_write_list()
413 if (unlikely(!segcount)) in rpcrdma_encode_write_list()
414 return -EMSGSIZE; in rpcrdma_encode_write_list()
419 seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, in rpcrdma_encode_write_list()
421 if (IS_ERR(seg)) in rpcrdma_encode_write_list()
423 rpcrdma_mr_push(mr, &req->rl_registered); in rpcrdma_encode_write_list()
425 if (encode_rdma_segment(xdr, mr) < 0) in rpcrdma_encode_write_list()
426 return -EMSGSIZE; in rpcrdma_encode_write_list()
428 trace_xprtrdma_write_chunk(rqst->rq_task, mr, nsegs); in rpcrdma_encode_write_list()
429 r_xprt->rx_stats.write_chunk_count++; in rpcrdma_encode_write_list()
430 r_xprt->rx_stats.total_rdma_request += mr->mr_length; in rpcrdma_encode_write_list()
432 nsegs -= mr->mr_nents; in rpcrdma_encode_write_list()
444 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
448 * 1 - N - HLOO - HLOO - ... - HLOO
450 * Returns zero on success, or a negative errno if a failure occurred.
457 struct xdr_stream *xdr = &req->rl_stream; in rpcrdma_encode_reply_chunk()
463 seg = req->rl_segments; in rpcrdma_encode_reply_chunk()
464 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg); in rpcrdma_encode_reply_chunk()
465 if (nsegs < 0) in rpcrdma_encode_reply_chunk()
468 if (encode_item_present(xdr) < 0) in rpcrdma_encode_reply_chunk()
469 return -EMSGSIZE; in rpcrdma_encode_reply_chunk()
471 if (unlikely(!segcount)) in rpcrdma_encode_reply_chunk()
472 return -EMSGSIZE; in rpcrdma_encode_reply_chunk()
477 seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, in rpcrdma_encode_reply_chunk()
479 if (IS_ERR(seg)) in rpcrdma_encode_reply_chunk()
481 rpcrdma_mr_push(mr, &req->rl_registered); in rpcrdma_encode_reply_chunk()
483 if (encode_rdma_segment(xdr, mr) < 0) in rpcrdma_encode_reply_chunk()
484 return -EMSGSIZE; in rpcrdma_encode_reply_chunk()
486 trace_xprtrdma_reply_chunk(rqst->rq_task, mr, nsegs); in rpcrdma_encode_reply_chunk()
487 r_xprt->rx_stats.reply_chunk_count++; in rpcrdma_encode_reply_chunk()
488 r_xprt->rx_stats.total_rdma_request += mr->mr_length; in rpcrdma_encode_reply_chunk()
490 nsegs -= mr->mr_nents; in rpcrdma_encode_reply_chunk()
500 * rpcrdma_unmap_sendctx - DMA-unmap Send buffers
507 struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia; in rpcrdma_unmap_sendctx()
513 * they can be cheaply re-used. in rpcrdma_unmap_sendctx()
515 sge = &sc->sc_sges[2]; in rpcrdma_unmap_sendctx()
516 for (count = sc->sc_unmap_count; count; ++sge, --count) in rpcrdma_unmap_sendctx()
517 ib_dma_unmap_page(ia->ri_device, in rpcrdma_unmap_sendctx()
518 sge->addr, sge->length, DMA_TO_DEVICE); in rpcrdma_unmap_sendctx()
520 if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) { in rpcrdma_unmap_sendctx()
522 wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES); in rpcrdma_unmap_sendctx()
526 /* Prepare an SGE for the RPC-over-RDMA transport header.
532 struct rpcrdma_sendctx *sc = req->rl_sendctx; in rpcrdma_prepare_hdr_sge()
533 struct rpcrdma_regbuf *rb = req->rl_rdmabuf; in rpcrdma_prepare_hdr_sge()
534 struct ib_sge *sge = sc->sc_sges; in rpcrdma_prepare_hdr_sge()
536 if (!rpcrdma_dma_map_regbuf(ia, rb)) in rpcrdma_prepare_hdr_sge()
538 sge->addr = rdmab_addr(rb); in rpcrdma_prepare_hdr_sge()
539 sge->length = len; in rpcrdma_prepare_hdr_sge()
540 sge->lkey = rdmab_lkey(rb); in rpcrdma_prepare_hdr_sge()
542 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, in rpcrdma_prepare_hdr_sge()
543 sge->length, DMA_TO_DEVICE); in rpcrdma_prepare_hdr_sge()
544 sc->sc_wr.num_sge++; in rpcrdma_prepare_hdr_sge()
559 struct rpcrdma_sendctx *sc = req->rl_sendctx; in rpcrdma_prepare_msg_sges()
561 struct rpcrdma_regbuf *rb = req->rl_sendbuf; in rpcrdma_prepare_msg_sges()
562 struct ib_device *device = ia->ri_device; in rpcrdma_prepare_msg_sges()
563 struct ib_sge *sge = sc->sc_sges; in rpcrdma_prepare_msg_sges()
564 u32 lkey = ia->ri_pd->local_dma_lkey; in rpcrdma_prepare_msg_sges()
568 * DMA-mapped. Sync the content that has changed. in rpcrdma_prepare_msg_sges()
570 if (!rpcrdma_dma_map_regbuf(ia, rb)) in rpcrdma_prepare_msg_sges()
574 sge[sge_no].length = xdr->head[0].iov_len; in rpcrdma_prepare_msg_sges()
579 /* If there is a Read chunk, the page list is being handled in rpcrdma_prepare_msg_sges()
585 if (rtype == rpcrdma_readch) { in rpcrdma_prepare_msg_sges()
586 len = xdr->tail[0].iov_len; in rpcrdma_prepare_msg_sges()
588 /* Do not include the tail if it is only an XDR pad */ in rpcrdma_prepare_msg_sges()
589 if (len < 4) in rpcrdma_prepare_msg_sges()
592 page = virt_to_page(xdr->tail[0].iov_base); in rpcrdma_prepare_msg_sges()
593 page_base = offset_in_page(xdr->tail[0].iov_base); in rpcrdma_prepare_msg_sges()
595 /* If the content in the page list is an odd length, in rpcrdma_prepare_msg_sges()
597 * of the tail iovec. Force the tail's non-pad content in rpcrdma_prepare_msg_sges()
601 len -= len & 3; in rpcrdma_prepare_msg_sges()
605 /* If there is a page list present, temporarily DMA map in rpcrdma_prepare_msg_sges()
608 if (xdr->page_len) { in rpcrdma_prepare_msg_sges()
609 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); in rpcrdma_prepare_msg_sges()
610 page_base = offset_in_page(xdr->page_base); in rpcrdma_prepare_msg_sges()
611 remaining = xdr->page_len; in rpcrdma_prepare_msg_sges()
614 if (sge_no > RPCRDMA_MAX_SEND_SGES - 2) in rpcrdma_prepare_msg_sges()
617 len = min_t(u32, PAGE_SIZE - page_base, remaining); in rpcrdma_prepare_msg_sges()
621 if (ib_dma_mapping_error(device, sge[sge_no].addr)) in rpcrdma_prepare_msg_sges()
626 sc->sc_unmap_count++; in rpcrdma_prepare_msg_sges()
628 remaining -= len; in rpcrdma_prepare_msg_sges()
638 if (xdr->tail[0].iov_len) { in rpcrdma_prepare_msg_sges()
639 page = virt_to_page(xdr->tail[0].iov_base); in rpcrdma_prepare_msg_sges()
640 page_base = offset_in_page(xdr->tail[0].iov_base); in rpcrdma_prepare_msg_sges()
641 len = xdr->tail[0].iov_len; in rpcrdma_prepare_msg_sges()
648 if (ib_dma_mapping_error(device, sge[sge_no].addr)) in rpcrdma_prepare_msg_sges()
652 sc->sc_unmap_count++; in rpcrdma_prepare_msg_sges()
656 sc->sc_wr.num_sge += sge_no; in rpcrdma_prepare_msg_sges()
657 if (sc->sc_unmap_count) in rpcrdma_prepare_msg_sges()
658 __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); in rpcrdma_prepare_msg_sges()
677 * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR
679 * @req: context of RPC Call being marshalled
681 * @xdr: xdr_buf containing RPC Call
691 req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf); in rpcrdma_prepare_send_sges()
692 if (!req->rl_sendctx) in rpcrdma_prepare_send_sges()
693 return -EAGAIN; in rpcrdma_prepare_send_sges()
694 req->rl_sendctx->sc_wr.num_sge = 0; in rpcrdma_prepare_send_sges()
695 req->rl_sendctx->sc_unmap_count = 0; in rpcrdma_prepare_send_sges()
696 req->rl_sendctx->sc_req = req; in rpcrdma_prepare_send_sges()
697 __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); in rpcrdma_prepare_send_sges()
699 if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen)) in rpcrdma_prepare_send_sges()
700 return -EIO; in rpcrdma_prepare_send_sges()
702 if (rtype != rpcrdma_areadch) in rpcrdma_prepare_send_sges()
703 if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype)) in rpcrdma_prepare_send_sges()
704 return -EIO; in rpcrdma_prepare_send_sges()
710 * rpcrdma_marshal_req - Marshal and send one RPC request
712 * @rqst: RPC request to be marshaled
714 * For the RPC in "rqst", this function:
715 * - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG)
716 * - Registers Read, Write, and Reply chunks
717 * - Constructs the transport header
718 * - Posts a Send WR to send the transport header and request
721 * %0 if the RPC was sent successfully,
722 * %-ENOTCONN if the connection was lost,
723 * %-EAGAIN if the caller should call again with the same arguments,
724 * %-ENOBUFS if the caller should call again after a delay,
725 * %-EMSGSIZE if the transport header is too small,
726 * %-EIO if a permanent problem occurred while marshaling.
732 struct xdr_stream *xdr = &req->rl_stream; in rpcrdma_marshal_req()
738 rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); in rpcrdma_marshal_req()
739 xdr_init_encode(xdr, &req->rl_hdrbuf, in rpcrdma_marshal_req()
740 req->rl_rdmabuf->rg_base); in rpcrdma_marshal_req()
743 ret = -EMSGSIZE; in rpcrdma_marshal_req()
745 if (!p) in rpcrdma_marshal_req()
747 *p++ = rqst->rq_xid; in rpcrdma_marshal_req()
749 *p++ = cpu_to_be32(r_xprt->rx_buf.rb_max_requests); in rpcrdma_marshal_req()
755 ddp_allowed = !(rqst->rq_cred->cr_auth->au_flags & in rpcrdma_marshal_req()
761 * o If the expected result is under the inline threshold, all ops in rpcrdma_marshal_req()
765 * o Large non-read ops return as a single reply chunk. in rpcrdma_marshal_req()
767 if (rpcrdma_results_inline(r_xprt, rqst)) in rpcrdma_marshal_req()
769 else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) in rpcrdma_marshal_req()
777 * o If the total request is under the inline threshold, all ops in rpcrdma_marshal_req()
781 * o Large non-write ops are sent with the entire message as a in rpcrdma_marshal_req()
782 * single read chunk (protocol 0-position special case). in rpcrdma_marshal_req()
785 * that both has a data payload, and whose non-data arguments in rpcrdma_marshal_req()
788 if (rpcrdma_args_inline(r_xprt, rqst)) { in rpcrdma_marshal_req()
791 } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) { in rpcrdma_marshal_req()
795 r_xprt->rx_stats.nomsg_call_count++; in rpcrdma_marshal_req()
800 /* If this is a retransmit, discard previously registered in rpcrdma_marshal_req()
804 while (unlikely(!list_empty(&req->rl_registered))) { in rpcrdma_marshal_req()
807 mr = rpcrdma_mr_pop(&req->rl_registered); in rpcrdma_marshal_req()
812 * of chunk lists in one RPC-over-RDMA Call message: in rpcrdma_marshal_req()
814 * - Read list in rpcrdma_marshal_req()
815 * - Write list in rpcrdma_marshal_req()
816 * - Reply chunk in rpcrdma_marshal_req()
817 * - Read list + Reply chunk in rpcrdma_marshal_req()
821 * - Read list + Write list in rpcrdma_marshal_req()
825 * - Write list + Reply chunk in rpcrdma_marshal_req()
826 * - Read list + Write list + Reply chunk in rpcrdma_marshal_req()
833 if (rtype != rpcrdma_noch) { in rpcrdma_marshal_req()
835 if (ret) in rpcrdma_marshal_req()
839 if (ret) in rpcrdma_marshal_req()
842 if (wtype == rpcrdma_writech) { in rpcrdma_marshal_req()
844 if (ret) in rpcrdma_marshal_req()
848 if (ret) in rpcrdma_marshal_req()
851 if (wtype != rpcrdma_replych) in rpcrdma_marshal_req()
855 if (ret) in rpcrdma_marshal_req()
861 &rqst->rq_snd_buf, rtype); in rpcrdma_marshal_req()
862 if (ret) in rpcrdma_marshal_req()
868 case -EAGAIN: in rpcrdma_marshal_req()
869 xprt_wait_for_buffer_space(rqst->rq_task, NULL); in rpcrdma_marshal_req()
871 case -ENOBUFS: in rpcrdma_marshal_req()
874 r_xprt->rx_stats.failed_marshal_count++; in rpcrdma_marshal_req()
880 * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
881 * @rqst: controlling RPC request
882 * @srcp: points to RPC message payload in receive buffer
906 /* The head iovec is redirected to the RPC reply message in rpcrdma_inline_fixup()
909 rqst->rq_rcv_buf.head[0].iov_base = srcp; in rpcrdma_inline_fixup()
910 rqst->rq_private_buf.head[0].iov_base = srcp; in rpcrdma_inline_fixup()
915 curlen = rqst->rq_rcv_buf.head[0].iov_len; in rpcrdma_inline_fixup()
916 if (curlen > copy_len) in rpcrdma_inline_fixup()
920 copy_len -= curlen; in rpcrdma_inline_fixup()
922 ppages = rqst->rq_rcv_buf.pages + in rpcrdma_inline_fixup()
923 (rqst->rq_rcv_buf.page_base >> PAGE_SHIFT); in rpcrdma_inline_fixup()
924 page_base = offset_in_page(rqst->rq_rcv_buf.page_base); in rpcrdma_inline_fixup()
926 if (copy_len && rqst->rq_rcv_buf.page_len) { in rpcrdma_inline_fixup()
929 pagelist_len = rqst->rq_rcv_buf.page_len; in rpcrdma_inline_fixup()
930 if (pagelist_len > copy_len) in rpcrdma_inline_fixup()
934 curlen = PAGE_SIZE - page_base; in rpcrdma_inline_fixup()
935 if (curlen > pagelist_len) in rpcrdma_inline_fixup()
945 copy_len -= curlen; in rpcrdma_inline_fixup()
947 pagelist_len -= curlen; in rpcrdma_inline_fixup()
948 if (!pagelist_len) in rpcrdma_inline_fixup()
959 if (pad) in rpcrdma_inline_fixup()
960 srcp -= pad; in rpcrdma_inline_fixup()
966 if (copy_len || pad) { in rpcrdma_inline_fixup()
967 rqst->rq_rcv_buf.tail[0].iov_base = srcp; in rpcrdma_inline_fixup()
968 rqst->rq_private_buf.tail[0].iov_base = srcp; in rpcrdma_inline_fixup()
976 * the RPC/RDMA header small and fixed in size, so it is
977 * straightforward to check the RPC header's direction field.
981 #if defined(CONFIG_SUNRPC_BACKCHANNEL) in rpcrdma_is_bcall()
983 struct xdr_stream *xdr = &rep->rr_stream; in rpcrdma_is_bcall()
986 if (rep->rr_proc != rdma_msg) in rpcrdma_is_bcall()
993 if (*p++ != xdr_zero) in rpcrdma_is_bcall()
995 if (*p++ != xdr_zero) in rpcrdma_is_bcall()
997 if (*p++ != xdr_zero) in rpcrdma_is_bcall()
1000 /* RPC header */ in rpcrdma_is_bcall()
1001 if (*p++ != rep->rr_xid) in rpcrdma_is_bcall()
1003 if (*p != cpu_to_be32(RPC_CALL)) in rpcrdma_is_bcall()
1007 * advance to the RPC header. in rpcrdma_is_bcall()
1010 if (unlikely(!p)) in rpcrdma_is_bcall()
1017 pr_warn("RPC/RDMA short backward direction call\n"); in rpcrdma_is_bcall()
1033 if (unlikely(!p)) in decode_rdma_segment()
1034 return -EIO; in decode_rdma_segment()
1050 if (unlikely(!p)) in decode_write_chunk()
1051 return -EIO; in decode_write_chunk()
1055 while (segcount--) { in decode_write_chunk()
1056 if (decode_rdma_segment(xdr, &seglength)) in decode_write_chunk()
1057 return -EIO; in decode_write_chunk()
1064 /* In RPC-over-RDMA Version One replies, a Read list is never
1065 * expected. This decoder is a stub that returns an error if
1073 if (unlikely(!p)) in decode_read_list()
1074 return -EIO; in decode_read_list()
1075 if (unlikely(*p != xdr_zero)) in decode_read_list()
1076 return -EIO; in decode_read_list()
1092 if (unlikely(!p)) in decode_write_list()
1093 return -EIO; in decode_write_list()
1094 if (*p == xdr_zero) in decode_write_list()
1096 if (!first) in decode_write_list()
1097 return -EIO; in decode_write_list()
1099 if (decode_write_chunk(xdr, &chunklen)) in decode_write_list()
1100 return -EIO; in decode_write_list()
1112 if (unlikely(!p)) in decode_reply_chunk()
1113 return -EIO; in decode_reply_chunk()
1116 if (*p != xdr_zero) in decode_reply_chunk()
1117 if (decode_write_chunk(xdr, length)) in decode_reply_chunk()
1118 return -EIO; in decode_reply_chunk()
1126 struct xdr_stream *xdr = &rep->rr_stream; in rpcrdma_decode_msg()
1131 if (decode_read_list(xdr)) in rpcrdma_decode_msg()
1132 return -EIO; in rpcrdma_decode_msg()
1133 if (decode_write_list(xdr, &writelist)) in rpcrdma_decode_msg()
1134 return -EIO; in rpcrdma_decode_msg()
1135 if (decode_reply_chunk(xdr, &replychunk)) in rpcrdma_decode_msg()
1136 return -EIO; in rpcrdma_decode_msg()
1139 if (unlikely(replychunk)) in rpcrdma_decode_msg()
1140 return -EIO; in rpcrdma_decode_msg()
1142 /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */ in rpcrdma_decode_msg()
1145 r_xprt->rx_stats.fixup_copy_count += in rpcrdma_decode_msg()
1148 r_xprt->rx_stats.total_rdma_reply += writelist; in rpcrdma_decode_msg()
1155 struct xdr_stream *xdr = &rep->rr_stream; in rpcrdma_decode_nomsg()
1159 if (decode_read_list(xdr)) in rpcrdma_decode_nomsg()
1160 return -EIO; in rpcrdma_decode_nomsg()
1161 if (decode_write_list(xdr, &writelist)) in rpcrdma_decode_nomsg()
1162 return -EIO; in rpcrdma_decode_nomsg()
1163 if (decode_reply_chunk(xdr, &replychunk)) in rpcrdma_decode_nomsg()
1164 return -EIO; in rpcrdma_decode_nomsg()
1167 if (unlikely(writelist)) in rpcrdma_decode_nomsg()
1168 return -EIO; in rpcrdma_decode_nomsg()
1169 if (unlikely(!replychunk)) in rpcrdma_decode_nomsg()
1170 return -EIO; in rpcrdma_decode_nomsg()
1173 r_xprt->rx_stats.total_rdma_reply += replychunk; in rpcrdma_decode_nomsg()
1181 struct xdr_stream *xdr = &rep->rr_stream; in rpcrdma_decode_error()
1185 if (unlikely(!p)) in rpcrdma_decode_error()
1186 return -EIO; in rpcrdma_decode_error()
1191 if (!p) in rpcrdma_decode_error()
1193 dprintk("RPC: %5u: %s: server reports version error (%u-%u)\n", in rpcrdma_decode_error()
1194 rqst->rq_task->tk_pid, __func__, in rpcrdma_decode_error()
1198 dprintk("RPC: %5u: %s: server reports header decoding error\n", in rpcrdma_decode_error()
1199 rqst->rq_task->tk_pid, __func__); in rpcrdma_decode_error()
1202 dprintk("RPC: %5u: %s: server reports unrecognized error %d\n", in rpcrdma_decode_error()
1203 rqst->rq_task->tk_pid, __func__, be32_to_cpup(p)); in rpcrdma_decode_error()
1206 r_xprt->rx_stats.bad_reply_count++; in rpcrdma_decode_error()
1207 return -EREMOTEIO; in rpcrdma_decode_error()
1210 /* Perform XID lookup, reconstruction of the RPC reply, and
1211 * RPC completion while holding the transport lock to ensure
1216 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; in rpcrdma_complete_rqst()
1217 struct rpc_xprt *xprt = &r_xprt->rx_xprt; in rpcrdma_complete_rqst()
1218 struct rpc_rqst *rqst = rep->rr_rqst; in rpcrdma_complete_rqst()
1222 xprt->reestablish_timeout = 0; in rpcrdma_complete_rqst()
1224 switch (rep->rr_proc) { in rpcrdma_complete_rqst()
1235 status = -EIO; in rpcrdma_complete_rqst()
1237 if (status < 0) in rpcrdma_complete_rqst()
1241 spin_lock(&xprt->recv_lock); in rpcrdma_complete_rqst()
1242 cwnd = xprt->cwnd; in rpcrdma_complete_rqst()
1243 xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT; in rpcrdma_complete_rqst()
1244 if (xprt->cwnd > cwnd) in rpcrdma_complete_rqst()
1245 xprt_release_rqst_cong(rqst->rq_task); in rpcrdma_complete_rqst()
1247 xprt_complete_rqst(rqst->rq_task, status); in rpcrdma_complete_rqst()
1249 spin_unlock(&xprt->recv_lock); in rpcrdma_complete_rqst()
1252 /* If the incoming reply terminated a pending RPC, the next in rpcrdma_complete_rqst()
1253 * RPC call will post a replacement receive buffer as it is in rpcrdma_complete_rqst()
1258 r_xprt->rx_stats.bad_reply_count++; in rpcrdma_complete_rqst()
1259 status = -EIO; in rpcrdma_complete_rqst()
1269 * send flow control: waking the next RPC waits until this in rpcrdma_release_rqst()
1270 * RPC has relinquished all its Send Queue entries. in rpcrdma_release_rqst()
1272 if (!list_empty(&req->rl_registered)) in rpcrdma_release_rqst()
1273 r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, in rpcrdma_release_rqst()
1274 &req->rl_registered); in rpcrdma_release_rqst()
1277 * the Send of the RPC Call have been unmapped before in rpcrdma_release_rqst()
1278 * allowing the RPC to complete. This protects argument in rpcrdma_release_rqst()
1279 * memory not controlled by the RPC client from being in rpcrdma_release_rqst()
1280 * re-used before we're done with it. in rpcrdma_release_rqst()
1282 if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { in rpcrdma_release_rqst()
1283 r_xprt->rx_stats.reply_waits_for_send++; in rpcrdma_release_rqst()
1284 out_of_line_wait_on_bit(&req->rl_flags, in rpcrdma_release_rqst()
1298 struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst); in rpcrdma_deferred_completion()
1299 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; in rpcrdma_deferred_completion()
1302 if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) in rpcrdma_deferred_completion()
1303 r_xprt->rx_ia.ri_ops->ro_reminv(rep, &req->rl_registered); in rpcrdma_deferred_completion()
1308 /* Process received RPC/RDMA messages.
1310 * Errors must result in the RPC task either being awakened, or
1315 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; in rpcrdma_reply_handler()
1316 struct rpc_xprt *xprt = &r_xprt->rx_xprt; in rpcrdma_reply_handler()
1317 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; in rpcrdma_reply_handler()
1323 --buf->rb_posted_receives; in rpcrdma_reply_handler()
1325 if (rep->rr_hdrbuf.head[0].iov_len == 0) in rpcrdma_reply_handler()
1329 xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf, in rpcrdma_reply_handler()
1330 rep->rr_hdrbuf.head[0].iov_base); in rpcrdma_reply_handler()
1331 p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p)); in rpcrdma_reply_handler()
1332 if (unlikely(!p)) in rpcrdma_reply_handler()
1334 rep->rr_xid = *p++; in rpcrdma_reply_handler()
1335 rep->rr_vers = *p++; in rpcrdma_reply_handler()
1337 rep->rr_proc = *p++; in rpcrdma_reply_handler()
1339 if (rep->rr_vers != rpcrdma_version) in rpcrdma_reply_handler()
1342 if (rpcrdma_is_bcall(r_xprt, rep)) in rpcrdma_reply_handler()
1348 spin_lock(&xprt->recv_lock); in rpcrdma_reply_handler()
1349 rqst = xprt_lookup_rqst(xprt, rep->rr_xid); in rpcrdma_reply_handler()
1350 if (!rqst) in rpcrdma_reply_handler()
1354 if (credits == 0) in rpcrdma_reply_handler()
1356 else if (credits > buf->rb_max_requests) in rpcrdma_reply_handler()
1357 credits = buf->rb_max_requests; in rpcrdma_reply_handler()
1358 buf->rb_credits = credits; in rpcrdma_reply_handler()
1360 spin_unlock(&xprt->recv_lock); in rpcrdma_reply_handler()
1363 if (req->rl_reply) { in rpcrdma_reply_handler()
1364 trace_xprtrdma_leaked_rep(rqst, req->rl_reply); in rpcrdma_reply_handler()
1365 rpcrdma_recv_buffer_put(req->rl_reply); in rpcrdma_reply_handler()
1367 req->rl_reply = rep; in rpcrdma_reply_handler()
1368 rep->rr_rqst = rqst; in rpcrdma_reply_handler()
1369 clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags); in rpcrdma_reply_handler()
1371 trace_xprtrdma_reply(rqst->rq_task, rep, req, credits); in rpcrdma_reply_handler()
1374 queue_work(rpcrdma_receive_wq, &rep->rr_work); in rpcrdma_reply_handler()
1381 /* The RPC transaction has already been terminated, or the header in rpcrdma_reply_handler()
1385 spin_unlock(&xprt->recv_lock); in rpcrdma_reply_handler()
1392 /* If no pending RPC transaction was matched, post a replacement in rpcrdma_reply_handler()