summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtrdma/svc_rdma_rw.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2020-12-16 05:52:30 +0300
committerLinus Torvalds <torvalds@linux-foundation.org>2020-12-16 05:52:30 +0300
commit1a50ede2b3c846761a71c409f53e9121311a13c2 (patch)
tree40f228239b077c2db87580dfd66556ad0ca7046f /net/sunrpc/xprtrdma/svc_rdma_rw.c
parent9867cb1fd510187d8f828540bdb48f78fceb70b3 (diff)
parent716a8bc7f706eeef80ab42c99d9f210eda845c81 (diff)
downloadlinux-1a50ede2b3c846761a71c409f53e9121311a13c2.tar.xz
Merge tag 'nfsd-5.11' of git://git.linux-nfs.org/projects/cel/cel-2.6
Pull nfsd updates from Chuck Lever: "Several substantial changes this time around: - Previously, exporting an NFS mount via NFSD was considered to be an unsupported feature. With v5.11, the community has attempted to make re-exporting a first-class feature of NFSD. This would enable the Linux in-kernel NFS server to be used as an intermediate cache for a remotely-located primary NFS server, for example, even with other NFS server implementations, like a NetApp filer, as the primary. - A short series of patches brings support for multiple RPC/RDMA data chunks per RPC transaction to the Linux NFS server's RPC/RDMA transport implementation. This is a part of the RPC/RDMA spec that the other premiere NFS/RDMA implementation (Solaris) has had for a very long time, and completes the implementation of RPC/RDMA version 1 in the Linux kernel's NFS server. - Long ago, NFSv4 support was introduced to NFSD using a series of C macros that hid dprintk's and goto's. Over time, the kernel's XDR implementation has been greatly improved, but these C macros have remained and become fallow. A series of patches in this pull request completely replaces those macros with the use of current kernel XDR infrastructure. Benefits include: - More robust input sanitization in NFSD's NFSv4 XDR decoders. - Make it easier to use common kernel library functions that use XDR stream APIs (for example, GSS-API). - Align the structure of the source code with the RFCs so it is easier to learn, verify, and maintain our XDR implementation. - Removal of more than a hundred hidden dprintk() call sites. - Removal of some explicit manipulation of pages to help make the eventual transition to xdr->bvec smoother. - On top of several related fixes in 5.10-rc, there are a few more fixes to get the Linux NFSD implementation of NFSv4.2 inter-server copy up to speed. And as usual, there is a pinch of seasoning in the form of a collection of unrelated minor bug fixes and clean-ups. Many thanks to all who contributed this time around!" * tag 'nfsd-5.11' of git://git.linux-nfs.org/projects/cel/cel-2.6: (131 commits) nfsd: Record NFSv4 pre/post-op attributes as non-atomic nfsd: Set PF_LOCAL_THROTTLE on local filesystems only nfsd: Fix up nfsd to ensure that timeout errors don't result in ESTALE exportfs: Add a function to return the raw output from fh_to_dentry() nfsd: close cached files prior to a REMOVE or RENAME that would replace target nfsd: allow filesystems to opt out of subtree checking nfsd: add a new EXPORT_OP_NOWCC flag to struct export_operations Revert "nfsd4: support change_attr_type attribute" nfsd4: don't query change attribute in v2/v3 case nfsd: minor nfsd4_change_attribute cleanup nfsd: simplify nfsd4_change_info nfsd: only call inode_query_iversion in the I_VERSION case nfs_common: need lock during iterate through the list NFSD: Fix 5 seconds delay when doing inter server copy NFSD: Fix sparse warning in nfs4proc.c SUNRPC: Remove XDRBUF_SPARSE_PAGES flag in gss_proxy upcall sunrpc: clean-up cache downcall nfsd: Fix message level for normal termination NFSD: Remove macros that are no longer used NFSD: Replace READ* macros in nfsd4_decode_compound() ...
Diffstat (limited to 'net/sunrpc/xprtrdma/svc_rdma_rw.c')
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_rw.c600
1 files changed, 439 insertions, 161 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 80a0c0e87590..0b63e1321d74 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -190,14 +190,14 @@ static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc,
* - Stores arguments for the SGL constructor functions
*/
struct svc_rdma_write_info {
+ const struct svc_rdma_chunk *wi_chunk;
+
/* write state of this chunk */
unsigned int wi_seg_off;
unsigned int wi_seg_no;
- unsigned int wi_nsegs;
- __be32 *wi_segs;
/* SGL constructor arguments */
- struct xdr_buf *wi_xdr;
+ const struct xdr_buf *wi_xdr;
unsigned char *wi_base;
unsigned int wi_next_off;
@@ -205,7 +205,8 @@ struct svc_rdma_write_info {
};
static struct svc_rdma_write_info *
-svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk)
+svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma,
+ const struct svc_rdma_chunk *chunk)
{
struct svc_rdma_write_info *info;
@@ -213,10 +214,9 @@ svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk)
if (!info)
return info;
+ info->wi_chunk = chunk;
info->wi_seg_off = 0;
info->wi_seg_no = 0;
- info->wi_nsegs = be32_to_cpup(++chunk);
- info->wi_segs = ++chunk;
svc_rdma_cc_init(rdma, &info->wi_cc);
info->wi_cc.cc_cqe.done = svc_rdma_write_done;
return info;
@@ -258,11 +258,11 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
/* State for pulling a Read chunk.
*/
struct svc_rdma_read_info {
+ struct svc_rqst *ri_rqst;
struct svc_rdma_recv_ctxt *ri_readctxt;
- unsigned int ri_position;
unsigned int ri_pageno;
unsigned int ri_pageoff;
- unsigned int ri_chunklen;
+ unsigned int ri_totalbytes;
struct svc_rdma_chunk_ctxt ri_cc;
};
@@ -358,7 +358,6 @@ static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
do {
if (atomic_sub_return(cc->cc_sqecount,
&rdma->sc_sq_avail) > 0) {
- trace_svcrdma_post_chunk(&cc->cc_cid, cc->cc_sqecount);
ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
if (ret)
break;
@@ -405,7 +404,7 @@ static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
struct svc_rdma_rw_ctxt *ctxt)
{
unsigned int sge_no, sge_bytes, page_off, page_no;
- struct xdr_buf *xdr = info->wi_xdr;
+ const struct xdr_buf *xdr = info->wi_xdr;
struct scatterlist *sg;
struct page **page;
@@ -443,40 +442,36 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info,
{
struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
struct svcxprt_rdma *rdma = cc->cc_rdma;
+ const struct svc_rdma_segment *seg;
struct svc_rdma_rw_ctxt *ctxt;
- __be32 *seg;
int ret;
- seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz;
do {
unsigned int write_len;
- u32 handle, length;
u64 offset;
- if (info->wi_seg_no >= info->wi_nsegs)
+ seg = &info->wi_chunk->ch_segments[info->wi_seg_no];
+ if (!seg)
goto out_overflow;
- xdr_decode_rdma_segment(seg, &handle, &length, &offset);
- offset += info->wi_seg_off;
-
- write_len = min(remaining, length - info->wi_seg_off);
+ write_len = min(remaining, seg->rs_length - info->wi_seg_off);
+ if (!write_len)
+ goto out_overflow;
ctxt = svc_rdma_get_rw_ctxt(rdma,
(write_len >> PAGE_SHIFT) + 2);
if (!ctxt)
return -ENOMEM;
constructor(info, write_len, ctxt);
- ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, handle,
+ offset = seg->rs_offset + info->wi_seg_off;
+ ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle,
DMA_TO_DEVICE);
if (ret < 0)
return -EIO;
- trace_svcrdma_send_wseg(handle, write_len, offset);
-
list_add(&ctxt->rw_list, &cc->cc_rwctxts);
cc->cc_sqecount += ret;
- if (write_len == length - info->wi_seg_off) {
- seg += 4;
+ if (write_len == seg->rs_length - info->wi_seg_off) {
info->wi_seg_no++;
info->wi_seg_off = 0;
} else {
@@ -489,31 +484,46 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info,
out_overflow:
trace_svcrdma_small_wrch_err(rdma, remaining, info->wi_seg_no,
- info->wi_nsegs);
+ info->wi_chunk->ch_segcount);
return -E2BIG;
}
-/* Send one of an xdr_buf's kvecs by itself. To send a Reply
- * chunk, the whole RPC Reply is written back to the client.
- * This function writes either the head or tail of the xdr_buf
- * containing the Reply.
+/**
+ * svc_rdma_iov_write - Construct RDMA Writes from an iov
+ * @info: pointer to write arguments
+ * @iov: kvec to write
+ *
+ * Returns:
+ * On succes, returns zero
+ * %-E2BIG if the client-provided Write chunk is too small
+ * %-ENOMEM if a resource has been exhausted
+ * %-EIO if an rdma-rw error occurred
*/
-static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info,
- struct kvec *vec)
+static int svc_rdma_iov_write(struct svc_rdma_write_info *info,
+ const struct kvec *iov)
{
- info->wi_base = vec->iov_base;
+ info->wi_base = iov->iov_base;
return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
- vec->iov_len);
+ iov->iov_len);
}
-/* Send an xdr_buf's page list by itself. A Write chunk is just
- * the page list. A Reply chunk is @xdr's head, page list, and
- * tail. This function is shared between the two types of chunk.
+/**
+ * svc_rdma_pages_write - Construct RDMA Writes from pages
+ * @info: pointer to write arguments
+ * @xdr: xdr_buf with pages to write
+ * @offset: offset into the content of @xdr
+ * @length: number of bytes to write
+ *
+ * Returns:
+ * On succes, returns zero
+ * %-E2BIG if the client-provided Write chunk is too small
+ * %-ENOMEM if a resource has been exhausted
+ * %-EIO if an rdma-rw error occurred
*/
-static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
- struct xdr_buf *xdr,
- unsigned int offset,
- unsigned long length)
+static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
+ const struct xdr_buf *xdr,
+ unsigned int offset,
+ unsigned long length)
{
info->wi_xdr = xdr;
info->wi_next_off = offset - xdr->head[0].iov_len;
@@ -522,12 +532,48 @@ static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
}
/**
+ * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
+ * @xdr: xdr_buf to write
+ * @data: pointer to write arguments
+ *
+ * Returns:
+ * On succes, returns zero
+ * %-E2BIG if the client-provided Write chunk is too small
+ * %-ENOMEM if a resource has been exhausted
+ * %-EIO if an rdma-rw error occurred
+ */
+static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data)
+{
+ struct svc_rdma_write_info *info = data;
+ int ret;
+
+ if (xdr->head[0].iov_len) {
+ ret = svc_rdma_iov_write(info, &xdr->head[0]);
+ if (ret < 0)
+ return ret;
+ }
+
+ if (xdr->page_len) {
+ ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
+ xdr->page_len);
+ if (ret < 0)
+ return ret;
+ }
+
+ if (xdr->tail[0].iov_len) {
+ ret = svc_rdma_iov_write(info, &xdr->tail[0]);
+ if (ret < 0)
+ return ret;
+ }
+
+ return xdr->len;
+}
+
+/**
* svc_rdma_send_write_chunk - Write all segments in a Write chunk
* @rdma: controlling RDMA transport
- * @wr_ch: Write chunk provided by client
+ * @chunk: Write chunk provided by the client
* @xdr: xdr_buf containing the data payload
- * @offset: payload's byte offset in @xdr
- * @length: size of payload, in bytes
*
* Returns a non-negative number of bytes the chunk consumed, or
* %-E2BIG if the payload was larger than the Write chunk,
@@ -536,30 +582,28 @@ static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
* %-ENOTCONN if posting failed (connection is lost),
* %-EIO if rdma_rw initialization failed (DMA mapping, etc).
*/
-int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
- struct xdr_buf *xdr,
- unsigned int offset, unsigned long length)
+int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
+ const struct svc_rdma_chunk *chunk,
+ const struct xdr_buf *xdr)
{
struct svc_rdma_write_info *info;
+ struct svc_rdma_chunk_ctxt *cc;
int ret;
- if (!length)
- return 0;
-
- info = svc_rdma_write_info_alloc(rdma, wr_ch);
+ info = svc_rdma_write_info_alloc(rdma, chunk);
if (!info)
return -ENOMEM;
+ cc = &info->wi_cc;
- ret = svc_rdma_send_xdr_pagelist(info, xdr, offset, length);
- if (ret < 0)
+ ret = svc_rdma_xb_write(xdr, info);
+ if (ret != xdr->len)
goto out_err;
- ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
+ trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
+ ret = svc_rdma_post_chunk_ctxt(cc);
if (ret < 0)
goto out_err;
-
- trace_svcrdma_send_write_chunk(xdr->page_len);
- return length;
+ return xdr->len;
out_err:
svc_rdma_write_info_free(info);
@@ -581,62 +625,62 @@ out_err:
*/
int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
const struct svc_rdma_recv_ctxt *rctxt,
- struct xdr_buf *xdr)
+ const struct xdr_buf *xdr)
{
struct svc_rdma_write_info *info;
- int consumed, ret;
+ struct svc_rdma_chunk_ctxt *cc;
+ struct svc_rdma_chunk *chunk;
+ int ret;
- info = svc_rdma_write_info_alloc(rdma, rctxt->rc_reply_chunk);
+ if (pcl_is_empty(&rctxt->rc_reply_pcl))
+ return 0;
+
+ chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
+ info = svc_rdma_write_info_alloc(rdma, chunk);
if (!info)
return -ENOMEM;
+ cc = &info->wi_cc;
- ret = svc_rdma_send_xdr_kvec(info, &xdr->head[0]);
+ ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
+ svc_rdma_xb_write, info);
if (ret < 0)
goto out_err;
- consumed = xdr->head[0].iov_len;
-
- /* Send the page list in the Reply chunk only if the
- * client did not provide Write chunks.
- */
- if (!rctxt->rc_write_list && xdr->page_len) {
- ret = svc_rdma_send_xdr_pagelist(info, xdr,
- xdr->head[0].iov_len,
- xdr->page_len);
- if (ret < 0)
- goto out_err;
- consumed += xdr->page_len;
- }
-
- if (xdr->tail[0].iov_len) {
- ret = svc_rdma_send_xdr_kvec(info, &xdr->tail[0]);
- if (ret < 0)
- goto out_err;
- consumed += xdr->tail[0].iov_len;
- }
- ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
+ trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
+ ret = svc_rdma_post_chunk_ctxt(cc);
if (ret < 0)
goto out_err;
- trace_svcrdma_send_reply_chunk(consumed);
- return consumed;
+ return xdr->len;
out_err:
svc_rdma_write_info_free(info);
return ret;
}
+/**
+ * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
+ * @info: context for ongoing I/O
+ * @segment: co-ordinates of remote memory to be read
+ *
+ * Returns:
+ * %0: the Read WR chain was constructed successfully
+ * %-EINVAL: there were not enough rq_pages to finish
+ * %-ENOMEM: allocating a local resources failed
+ * %-EIO: a DMA mapping error occurred
+ */
static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
- struct svc_rqst *rqstp,
- u32 rkey, u32 len, u64 offset)
+ const struct svc_rdma_segment *segment)
{
struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
struct svc_rdma_chunk_ctxt *cc = &info->ri_cc;
+ struct svc_rqst *rqstp = info->ri_rqst;
struct svc_rdma_rw_ctxt *ctxt;
- unsigned int sge_no, seg_len;
+ unsigned int sge_no, seg_len, len;
struct scatterlist *sg;
int ret;
+ len = segment->rs_length;
sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT;
ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no);
if (!ctxt)
@@ -670,8 +714,8 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
goto out_overrun;
}
- ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, offset, rkey,
- DMA_FROM_DEVICE);
+ ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, segment->rs_offset,
+ segment->rs_handle, DMA_FROM_DEVICE);
if (ret < 0)
return -EIO;
@@ -684,54 +728,177 @@ out_overrun:
return -EINVAL;
}
-/* Walk the segments in the Read chunk starting at @p and construct
- * RDMA Read operations to pull the chunk to the server.
+/**
+ * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk
+ * @info: context for ongoing I/O
+ * @chunk: Read chunk to pull
+ *
+ * Return values:
+ * %0: the Read WR chain was constructed successfully
+ * %-EINVAL: there were not enough resources to finish
+ * %-ENOMEM: allocating a local resources failed
+ * %-EIO: a DMA mapping error occurred
*/
-static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
- struct svc_rdma_read_info *info,
- __be32 *p)
+static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info,
+ const struct svc_rdma_chunk *chunk)
{
+ const struct svc_rdma_segment *segment;
int ret;
ret = -EINVAL;
- info->ri_chunklen = 0;
- while (*p++ != xdr_zero && be32_to_cpup(p++) == info->ri_position) {
- u32 handle, length;
- u64 offset;
+ pcl_for_each_segment(segment, chunk) {
+ ret = svc_rdma_build_read_segment(info, segment);
+ if (ret < 0)
+ break;
+ info->ri_totalbytes += segment->rs_length;
+ }
+ return ret;
+}
+
+/**
+ * svc_rdma_copy_inline_range - Copy part of the inline content into pages
+ * @info: context for RDMA Reads
+ * @offset: offset into the Receive buffer of region to copy
+ * @remaining: length of region to copy
+ *
+ * Take a page at a time from rqstp->rq_pages and copy the inline
+ * content from the Receive buffer into that page. Update
+ * info->ri_pageno and info->ri_pageoff so that the next RDMA Read
+ * result will land contiguously with the copied content.
+ *
+ * Return values:
+ * %0: Inline content was successfully copied
+ * %-EINVAL: offset or length was incorrect
+ */
+static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info,
+ unsigned int offset,
+ unsigned int remaining)
+{
+ struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
+ unsigned char *dst, *src = head->rc_recv_buf;
+ struct svc_rqst *rqstp = info->ri_rqst;
+ unsigned int page_no, numpages;
+
+ numpages = PAGE_ALIGN(info->ri_pageoff + remaining) >> PAGE_SHIFT;
+ for (page_no = 0; page_no < numpages; page_no++) {
+ unsigned int page_len;
+
+ page_len = min_t(unsigned int, remaining,
+ PAGE_SIZE - info->ri_pageoff);
+
+ head->rc_arg.pages[info->ri_pageno] =
+ rqstp->rq_pages[info->ri_pageno];
+ if (!info->ri_pageoff)
+ head->rc_page_count++;
+
+ dst = page_address(head->rc_arg.pages[info->ri_pageno]);
+ memcpy(dst + info->ri_pageno, src + offset, page_len);
+
+ info->ri_totalbytes += page_len;
+ info->ri_pageoff += page_len;
+ if (info->ri_pageoff == PAGE_SIZE) {
+ info->ri_pageno++;
+ info->ri_pageoff = 0;
+ }
+ remaining -= page_len;
+ offset += page_len;
+ }
+
+ return -EINVAL;
+}
+
+/**
+ * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks
+ * @info: context for RDMA Reads
+ *
+ * The chunk data lands in head->rc_arg as a series of contiguous pages,
+ * like an incoming TCP call.
+ *
+ * Return values:
+ * %0: RDMA Read WQEs were successfully built
+ * %-EINVAL: client provided too many chunks or segments,
+ * %-ENOMEM: rdma_rw context pool was exhausted,
+ * %-ENOTCONN: posting failed (connection is lost),
+ * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
+ */
+static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *info)
+{
+ struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
+ const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
+ struct svc_rdma_chunk *chunk, *next;
+ struct xdr_buf *buf = &head->rc_arg;
+ unsigned int start, length;
+ int ret;
- p = xdr_decode_rdma_segment(p, &handle, &length, &offset);
- ret = svc_rdma_build_read_segment(info, rqstp, handle, length,
- offset);
+ start = 0;
+ chunk = pcl_first_chunk(pcl);
+ length = chunk->ch_position;
+ ret = svc_rdma_copy_inline_range(info, start, length);
+ if (ret < 0)
+ return ret;
+
+ pcl_for_each_chunk(chunk, pcl) {
+ ret = svc_rdma_build_read_chunk(info, chunk);
if (ret < 0)
+ return ret;
+
+ next = pcl_next_chunk(pcl, chunk);
+ if (!next)
break;
- trace_svcrdma_send_rseg(handle, length, offset);
- info->ri_chunklen += length;
+ start += length;
+ length = next->ch_position - info->ri_totalbytes;
+ ret = svc_rdma_copy_inline_range(info, start, length);
+ if (ret < 0)
+ return ret;
}
- return ret;
+ start += length;
+ length = head->rc_byte_len - start;
+ ret = svc_rdma_copy_inline_range(info, start, length);
+ if (ret < 0)
+ return ret;
+
+ buf->len += info->ri_totalbytes;
+ buf->buflen += info->ri_totalbytes;
+
+ head->rc_hdr_count = 1;
+ buf->head[0].iov_base = page_address(head->rc_pages[0]);
+ buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes);
+ buf->page_len = info->ri_totalbytes - buf->head[0].iov_len;
+ return 0;
}
-/* Construct RDMA Reads to pull over a normal Read chunk. The chunk
- * data lands in the page list of head->rc_arg.pages.
+/**
+ * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks
+ * @info: context for RDMA Reads
+ *
+ * The chunk data lands in the page list of head->rc_arg.pages.
*
* Currently NFSD does not look at the head->rc_arg.tail[0] iovec.
* Therefore, XDR round-up of the Read chunk and trailing
* inline content must both be added at the end of the pagelist.
+ *
+ * Return values:
+ * %0: RDMA Read WQEs were successfully built
+ * %-EINVAL: client provided too many chunks or segments,
+ * %-ENOMEM: rdma_rw context pool was exhausted,
+ * %-ENOTCONN: posting failed (connection is lost),
+ * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
*/
-static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
- struct svc_rdma_read_info *info,
- __be32 *p)
+static int svc_rdma_read_data_item(struct svc_rdma_read_info *info)
{
struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
+ struct xdr_buf *buf = &head->rc_arg;
+ struct svc_rdma_chunk *chunk;
+ unsigned int length;
int ret;
- ret = svc_rdma_build_read_chunk(rqstp, info, p);
+ chunk = pcl_first_chunk(&head->rc_read_pcl);
+ ret = svc_rdma_build_read_chunk(info, chunk);
if (ret < 0)
goto out;
- trace_svcrdma_send_read_chunk(info->ri_chunklen, info->ri_position);
-
head->rc_hdr_count = 0;
/* Split the Receive buffer between the head and tail
@@ -739,11 +906,9 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
* chunk is not included in either the pagelist or in
* the tail.
*/
- head->rc_arg.tail[0].iov_base =
- head->rc_arg.head[0].iov_base + info->ri_position;
- head->rc_arg.tail[0].iov_len =
- head->rc_arg.head[0].iov_len - info->ri_position;
- head->rc_arg.head[0].iov_len = info->ri_position;
+ buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position;
+ buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position;
+ buf->head[0].iov_len = chunk->ch_position;
/* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
*
@@ -754,50 +919,149 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
* Currently these chunks always start at page offset 0,
* thus the rounded-up length never crosses a page boundary.
*/
- info->ri_chunklen = XDR_QUADLEN(info->ri_chunklen) << 2;
-
- head->rc_arg.page_len = info->ri_chunklen;
- head->rc_arg.len += info->ri_chunklen;
- head->rc_arg.buflen += info->ri_chunklen;
+ length = XDR_QUADLEN(info->ri_totalbytes) << 2;
+ buf->page_len = length;
+ buf->len += length;
+ buf->buflen += length;
out:
return ret;
}
-/* Construct RDMA Reads to pull over a Position Zero Read chunk.
- * The start of the data lands in the first page just after
- * the Transport header, and the rest lands in the page list of
+/**
+ * svc_rdma_read_chunk_range - Build RDMA Read WQEs for portion of a chunk
+ * @info: context for RDMA Reads
+ * @chunk: parsed Call chunk to pull
+ * @offset: offset of region to pull
+ * @length: length of region to pull
+ *
+ * Return values:
+ * %0: RDMA Read WQEs were successfully built
+ * %-EINVAL: there were not enough resources to finish
+ * %-ENOMEM: rdma_rw context pool was exhausted,
+ * %-ENOTCONN: posting failed (connection is lost),
+ * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
+ */
+static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info,
+ const struct svc_rdma_chunk *chunk,
+ unsigned int offset, unsigned int length)
+{
+ const struct svc_rdma_segment *segment;
+ int ret;
+
+ ret = -EINVAL;
+ pcl_for_each_segment(segment, chunk) {
+ struct svc_rdma_segment dummy;
+
+ if (offset > segment->rs_length) {
+ offset -= segment->rs_length;
+ continue;
+ }
+
+ dummy.rs_handle = segment->rs_handle;
+ dummy.rs_length = min_t(u32, length, segment->rs_length) - offset;
+ dummy.rs_offset = segment->rs_offset + offset;
+
+ ret = svc_rdma_build_read_segment(info, &dummy);
+ if (ret < 0)
+ break;
+
+ info->ri_totalbytes += dummy.rs_length;
+ length -= dummy.rs_length;
+ offset = 0;
+ }
+ return ret;
+}
+
+/**
+ * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message
+ * @info: context for RDMA Reads
+ *
+ * Return values:
+ * %0: RDMA Read WQEs were successfully built
+ * %-EINVAL: there were not enough resources to finish
+ * %-ENOMEM: rdma_rw context pool was exhausted,
+ * %-ENOTCONN: posting failed (connection is lost),
+ * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
+ */
+static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info)
+{
+ struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
+ const struct svc_rdma_chunk *call_chunk =
+ pcl_first_chunk(&head->rc_call_pcl);
+ const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
+ struct svc_rdma_chunk *chunk, *next;
+ unsigned int start, length;
+ int ret;
+
+ if (pcl_is_empty(pcl))
+ return svc_rdma_build_read_chunk(info, call_chunk);
+
+ start = 0;
+ chunk = pcl_first_chunk(pcl);
+ length = chunk->ch_position;
+ ret = svc_rdma_read_chunk_range(info, call_chunk, start, length);
+ if (ret < 0)
+ return ret;
+
+ pcl_for_each_chunk(chunk, pcl) {
+ ret = svc_rdma_build_read_chunk(info, chunk);
+ if (ret < 0)
+ return ret;
+
+ next = pcl_next_chunk(pcl, chunk);
+ if (!next)
+ break;
+
+ start += length;
+ length = next->ch_position - info->ri_totalbytes;
+ ret = svc_rdma_read_chunk_range(info, call_chunk,
+ start, length);
+ if (ret < 0)
+ return ret;
+ }
+
+ start += length;
+ length = call_chunk->ch_length - start;
+ return svc_rdma_read_chunk_range(info, call_chunk, start, length);
+}
+
+/**
+ * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
+ * @info: context for RDMA Reads
+ *
+ * The start of the data lands in the first page just after the
+ * Transport header, and the rest lands in the page list of
* head->rc_arg.pages.
*
* Assumptions:
- * - A PZRC has an XDR-aligned length (no implicit round-up).
- * - There can be no trailing inline content (IOW, we assume
- * a PZRC is never sent in an RDMA_MSG message, though it's
- * allowed by spec).
+ * - A PZRC is never sent in an RDMA_MSG message, though it's
+ * allowed by spec.
+ *
+ * Return values:
+ * %0: RDMA Read WQEs were successfully built
+ * %-EINVAL: client provided too many chunks or segments,
+ * %-ENOMEM: rdma_rw context pool was exhausted,
+ * %-ENOTCONN: posting failed (connection is lost),
+ * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
*/
-static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
- struct svc_rdma_read_info *info,
- __be32 *p)
+static noinline int svc_rdma_read_special(struct svc_rdma_read_info *info)
{
struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
+ struct xdr_buf *buf = &head->rc_arg;
int ret;
- ret = svc_rdma_build_read_chunk(rqstp, info, p);
+ ret = svc_rdma_read_call_chunk(info);
if (ret < 0)
goto out;
- trace_svcrdma_send_pzr(info->ri_chunklen);
-
- head->rc_arg.len += info->ri_chunklen;
- head->rc_arg.buflen += info->ri_chunklen;
+ buf->len += info->ri_totalbytes;
+ buf->buflen += info->ri_totalbytes;
head->rc_hdr_count = 1;
- head->rc_arg.head[0].iov_base = page_address(head->rc_pages[0]);
- head->rc_arg.head[0].iov_len = min_t(size_t, PAGE_SIZE,
- info->ri_chunklen);
-
- head->rc_arg.page_len = info->ri_chunklen -
- head->rc_arg.head[0].iov_len;
+ buf->head[0].iov_base = page_address(head->rc_pages[0]);
+ buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes);
+ buf->page_len = info->ri_totalbytes - buf->head[0].iov_len;
out:
return ret;
@@ -824,26 +1088,34 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
}
/**
- * svc_rdma_recv_read_chunk - Pull a Read chunk from the client
+ * svc_rdma_process_read_list - Pull list of Read chunks from the client
* @rdma: controlling RDMA transport
* @rqstp: set of pages to use as Read sink buffers
* @head: pages under I/O collect here
- * @p: pointer to start of Read chunk
*
- * Returns:
- * %0 if all needed RDMA Reads were posted successfully,
- * %-EINVAL if client provided too many segments,
- * %-ENOMEM if rdma_rw context pool was exhausted,
- * %-ENOTCONN if posting failed (connection is lost),
- * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
+ * The RPC/RDMA protocol assumes that the upper layer's XDR decoders
+ * pull each Read chunk as they decode an incoming RPC message.
*
- * Assumptions:
- * - All Read segments in @p have the same Position value.
+ * On Linux, however, the server needs to have a fully-constructed RPC
+ * message in rqstp->rq_arg when there is a positive return code from
+ * ->xpo_recvfrom. So the Read list is safety-checked immediately when
+ * it is received, then here the whole Read list is pulled all at once.
+ * The ingress RPC message is fully reconstructed once all associated
+ * RDMA Reads have completed.
+ *
+ * Return values:
+ * %1: all needed RDMA Reads were posted successfully,
+ * %-EINVAL: client provided too many chunks or segments,
+ * %-ENOMEM: rdma_rw context pool was exhausted,
+ * %-ENOTCONN: posting failed (connection is lost),
+ * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
*/
-int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
- struct svc_rdma_recv_ctxt *head, __be32 *p)
+int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_recv_ctxt *head)
{
struct svc_rdma_read_info *info;
+ struct svc_rdma_chunk_ctxt *cc;
int ret;
/* The request (with page list) is constructed in
@@ -861,23 +1133,29 @@ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
info = svc_rdma_read_info_alloc(rdma);
if (!info)
return -ENOMEM;
+ cc = &info->ri_cc;
+ info->ri_rqst = rqstp;
info->ri_readctxt = head;
info->ri_pageno = 0;
info->ri_pageoff = 0;
-
- info->ri_position = be32_to_cpup(p + 1);
- if (info->ri_position)
- ret = svc_rdma_build_normal_read_chunk(rqstp, info, p);
- else
- ret = svc_rdma_build_pz_read_chunk(rqstp, info, p);
+ info->ri_totalbytes = 0;
+
+ if (pcl_is_empty(&head->rc_call_pcl)) {
+ if (head->rc_read_pcl.cl_count == 1)
+ ret = svc_rdma_read_data_item(info);
+ else
+ ret = svc_rdma_read_multiple_chunks(info);
+ } else
+ ret = svc_rdma_read_special(info);
if (ret < 0)
goto out_err;
- ret = svc_rdma_post_chunk_ctxt(&info->ri_cc);
+ trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount);
+ ret = svc_rdma_post_chunk_ctxt(cc);
if (ret < 0)
goto out_err;
svc_rdma_save_io_pages(rqstp, 0, head->rc_page_count);
- return 0;
+ return 1;
out_err:
svc_rdma_read_info_free(info);