summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtrdma/svc_rdma_rw.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma/svc_rdma_rw.c')
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_rw.c450
1 files changed, 188 insertions, 262 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index e460e25a1d6d..c00fcce61d1e 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -39,6 +39,7 @@ struct svc_rdma_rw_ctxt {
struct list_head rw_list;
struct rdma_rw_ctx rw_ctx;
unsigned int rw_nents;
+ unsigned int rw_first_sgl_nents;
struct sg_table rw_sg_table;
struct scatterlist rw_first_sgl[];
};
@@ -53,6 +54,8 @@ svc_rdma_next_ctxt(struct list_head *list)
static struct svc_rdma_rw_ctxt *
svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
{
+ struct ib_device *dev = rdma->sc_cm_id->device;
+ unsigned int first_sgl_nents = dev->attrs.max_send_sge;
struct svc_rdma_rw_ctxt *ctxt;
struct llist_node *node;
@@ -62,32 +65,33 @@ svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
if (node) {
ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
} else {
- ctxt = kmalloc_node(struct_size(ctxt, rw_first_sgl, SG_CHUNK_SIZE),
- GFP_KERNEL, ibdev_to_node(rdma->sc_cm_id->device));
+ ctxt = kmalloc_node(struct_size(ctxt, rw_first_sgl, first_sgl_nents),
+ GFP_KERNEL, ibdev_to_node(dev));
if (!ctxt)
goto out_noctx;
INIT_LIST_HEAD(&ctxt->rw_list);
+ ctxt->rw_first_sgl_nents = first_sgl_nents;
}
ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
ctxt->rw_sg_table.sgl,
- SG_CHUNK_SIZE))
+ first_sgl_nents))
goto out_free;
return ctxt;
out_free:
kfree(ctxt);
out_noctx:
- trace_svcrdma_no_rwctx_err(rdma, sges);
+ trace_svcrdma_rwctx_empty(rdma, sges);
return NULL;
}
static void __svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt,
struct llist_head *list)
{
- sg_free_table_chained(&ctxt->rw_sg_table, SG_CHUNK_SIZE);
+ sg_free_table_chained(&ctxt->rw_sg_table, ctxt->rw_first_sgl_nents);
llist_add(&ctxt->rw_node, list);
}
@@ -135,57 +139,40 @@ static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma,
ctxt->rw_sg_table.sgl, ctxt->rw_nents,
0, offset, handle, direction);
if (unlikely(ret < 0)) {
+ trace_svcrdma_dma_map_rw_err(rdma, offset, handle,
+ ctxt->rw_nents, ret);
svc_rdma_put_rw_ctxt(rdma, ctxt);
- trace_svcrdma_dma_map_rw_err(rdma, ctxt->rw_nents, ret);
}
return ret;
}
-/* A chunk context tracks all I/O for moving one Read or Write
- * chunk. This is a set of rdma_rw's that handle data movement
- * for all segments of one chunk.
- *
- * These are small, acquired with a single allocator call, and
- * no more than one is needed per chunk. They are allocated on
- * demand, and not cached.
+/**
+ * svc_rdma_cc_init - Initialize an svc_rdma_chunk_ctxt
+ * @rdma: controlling transport instance
+ * @cc: svc_rdma_chunk_ctxt to be initialized
*/
-struct svc_rdma_chunk_ctxt {
- struct rpc_rdma_cid cc_cid;
- struct ib_cqe cc_cqe;
- struct svcxprt_rdma *cc_rdma;
- struct list_head cc_rwctxts;
- ktime_t cc_posttime;
- int cc_sqecount;
- enum ib_wc_status cc_status;
- struct completion cc_done;
-};
-
-static void svc_rdma_cc_cid_init(struct svcxprt_rdma *rdma,
- struct rpc_rdma_cid *cid)
+void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
+ struct svc_rdma_chunk_ctxt *cc)
{
- cid->ci_queue_id = rdma->sc_sq_cq->res.id;
- cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
-}
+ struct rpc_rdma_cid *cid = &cc->cc_cid;
-static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
- struct svc_rdma_chunk_ctxt *cc)
-{
- svc_rdma_cc_cid_init(rdma, &cc->cc_cid);
- cc->cc_rdma = rdma;
+ if (unlikely(!cid->ci_completion_id))
+ svc_rdma_send_cid_init(rdma, cid);
INIT_LIST_HEAD(&cc->cc_rwctxts);
cc->cc_sqecount = 0;
}
-/*
- * The consumed rw_ctx's are cleaned and placed on a local llist so
- * that only one atomic llist operation is needed to put them all
- * back on the free list.
+/**
+ * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt
+ * @rdma: controlling transport instance
+ * @cc: svc_rdma_chunk_ctxt to be released
+ * @dir: DMA direction
*/
-static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc,
- enum dma_data_direction dir)
+void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
+ struct svc_rdma_chunk_ctxt *cc,
+ enum dma_data_direction dir)
{
- struct svcxprt_rdma *rdma = cc->cc_rdma;
struct llist_node *first, *last;
struct svc_rdma_rw_ctxt *ctxt;
LLIST_HEAD(free);
@@ -215,6 +202,8 @@ static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc,
* - Stores arguments for the SGL constructor functions
*/
struct svc_rdma_write_info {
+ struct svcxprt_rdma *wi_rdma;
+
const struct svc_rdma_chunk *wi_chunk;
/* write state of this chunk */
@@ -227,6 +216,7 @@ struct svc_rdma_write_info {
unsigned int wi_next_off;
struct svc_rdma_chunk_ctxt wi_cc;
+ struct work_struct wi_work;
};
static struct svc_rdma_write_info *
@@ -235,25 +225,33 @@ svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma,
{
struct svc_rdma_write_info *info;
- info = kmalloc_node(sizeof(*info), GFP_KERNEL,
+ info = kzalloc_node(sizeof(*info), GFP_KERNEL,
ibdev_to_node(rdma->sc_cm_id->device));
if (!info)
return info;
+ info->wi_rdma = rdma;
info->wi_chunk = chunk;
- info->wi_seg_off = 0;
- info->wi_seg_no = 0;
svc_rdma_cc_init(rdma, &info->wi_cc);
info->wi_cc.cc_cqe.done = svc_rdma_write_done;
return info;
}
-static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
+static void svc_rdma_write_info_free_async(struct work_struct *work)
{
- svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE);
+ struct svc_rdma_write_info *info;
+
+ info = container_of(work, struct svc_rdma_write_info, wi_work);
+ svc_rdma_cc_release(info->wi_rdma, &info->wi_cc, DMA_TO_DEVICE);
kfree(info);
}
+static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
+{
+ INIT_WORK(&info->wi_work, svc_rdma_write_info_free_async);
+ queue_work(svcrdma_wq, &info->wi_work);
+}
+
/**
* svc_rdma_write_done - Write chunk completion
* @cq: controlling Completion Queue
@@ -263,16 +261,16 @@ static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
*/
static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
{
+ struct svcxprt_rdma *rdma = cq->cq_context;
struct ib_cqe *cqe = wc->wr_cqe;
struct svc_rdma_chunk_ctxt *cc =
container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
- struct svcxprt_rdma *rdma = cc->cc_rdma;
struct svc_rdma_write_info *info =
container_of(cc, struct svc_rdma_write_info, wi_cc);
switch (wc->status) {
case IB_WC_SUCCESS:
- trace_svcrdma_wc_write(wc, &cc->cc_cid);
+ trace_svcrdma_wc_write(&cc->cc_cid);
break;
case IB_WC_WR_FLUSH_ERR:
trace_svcrdma_wc_write_flush(wc, &cc->cc_cid);
@@ -289,39 +287,6 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
svc_rdma_write_info_free(info);
}
-/* State for pulling a Read chunk.
- */
-struct svc_rdma_read_info {
- struct svc_rqst *ri_rqst;
- struct svc_rdma_recv_ctxt *ri_readctxt;
- unsigned int ri_pageno;
- unsigned int ri_pageoff;
- unsigned int ri_totalbytes;
-
- struct svc_rdma_chunk_ctxt ri_cc;
-};
-
-static struct svc_rdma_read_info *
-svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma)
-{
- struct svc_rdma_read_info *info;
-
- info = kmalloc_node(sizeof(*info), GFP_KERNEL,
- ibdev_to_node(rdma->sc_cm_id->device));
- if (!info)
- return info;
-
- svc_rdma_cc_init(rdma, &info->ri_cc);
- info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done;
- return info;
-}
-
-static void svc_rdma_read_info_free(struct svc_rdma_read_info *info)
-{
- svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE);
- kfree(info);
-}
-
/**
* svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
* @cq: controlling Completion Queue
@@ -330,17 +295,27 @@ static void svc_rdma_read_info_free(struct svc_rdma_read_info *info)
*/
static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
{
+ struct svcxprt_rdma *rdma = cq->cq_context;
struct ib_cqe *cqe = wc->wr_cqe;
struct svc_rdma_chunk_ctxt *cc =
container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
- struct svc_rdma_read_info *info;
+ struct svc_rdma_recv_ctxt *ctxt;
+ svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
+
+ ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc);
switch (wc->status) {
case IB_WC_SUCCESS:
- info = container_of(cc, struct svc_rdma_read_info, ri_cc);
- trace_svcrdma_wc_read(wc, &cc->cc_cid, info->ri_totalbytes,
+ trace_svcrdma_wc_read(wc, &cc->cc_cid, ctxt->rc_readbytes,
cc->cc_posttime);
- break;
+
+ spin_lock(&rdma->sc_rq_dto_lock);
+ list_add_tail(&ctxt->rc_list, &rdma->sc_read_complete_q);
+ /* the unlock pairs with the smp_rmb in svc_xprt_ready */
+ set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
+ spin_unlock(&rdma->sc_rq_dto_lock);
+ svc_xprt_enqueue(&rdma->sc_xprt);
+ return;
case IB_WC_WR_FLUSH_ERR:
trace_svcrdma_wc_read_flush(wc, &cc->cc_cid);
break;
@@ -348,10 +323,13 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
trace_svcrdma_wc_read_err(wc, &cc->cc_cid);
}
- svc_rdma_wake_send_waiters(cc->cc_rdma, cc->cc_sqecount);
- cc->cc_status = wc->status;
- complete(&cc->cc_done);
- return;
+ /* The RDMA Read has flushed, so the incoming RPC message
+ * cannot be constructed and must be dropped. Signal the
+ * loss to the client by closing the connection.
+ */
+ svc_rdma_cc_release(rdma, cc, DMA_FROM_DEVICE);
+ svc_rdma_recv_ctxt_put(rdma, ctxt);
+ svc_xprt_deferred_close(&rdma->sc_xprt);
}
/*
@@ -360,9 +338,9 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
* even if one or more WRs are flushed. This is true when posting
* an rdma_rw_ctx or when posting a single signaled WR.
*/
-static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
+static int svc_rdma_post_chunk_ctxt(struct svcxprt_rdma *rdma,
+ struct svc_rdma_chunk_ctxt *cc)
{
- struct svcxprt_rdma *rdma = cc->cc_rdma;
struct ib_send_wr *first_wr;
const struct ib_send_wr *bad_wr;
struct list_head *tmp;
@@ -396,14 +374,14 @@ static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
}
percpu_counter_inc(&svcrdma_stat_sq_starve);
- trace_svcrdma_sq_full(rdma);
+ trace_svcrdma_sq_full(rdma, &cc->cc_cid);
atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
wait_event(rdma->sc_send_wait,
atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
- trace_svcrdma_sq_retry(rdma);
+ trace_svcrdma_sq_retry(rdma, &cc->cc_cid);
} while (1);
- trace_svcrdma_sq_post_err(rdma, ret);
+ trace_svcrdma_sq_post_err(rdma, &cc->cc_cid, ret);
svc_xprt_deferred_close(&rdma->sc_xprt);
/* If even one was posted, there will be a completion. */
@@ -473,7 +451,7 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info,
unsigned int remaining)
{
struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
- struct svcxprt_rdma *rdma = cc->cc_rdma;
+ struct svcxprt_rdma *rdma = info->wi_rdma;
const struct svc_rdma_segment *seg;
struct svc_rdma_rw_ctxt *ctxt;
int ret;
@@ -516,7 +494,7 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info,
return 0;
out_overflow:
- trace_svcrdma_small_wrch_err(rdma, remaining, info->wi_seg_no,
+ trace_svcrdma_small_wrch_err(&cc->cc_cid, remaining, info->wi_seg_no,
info->wi_chunk->ch_segcount);
return -E2BIG;
}
@@ -633,7 +611,7 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
goto out_err;
trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
- ret = svc_rdma_post_chunk_ctxt(cc);
+ ret = svc_rdma_post_chunk_ctxt(rdma, cc);
if (ret < 0)
goto out_err;
return xdr->len;
@@ -680,7 +658,7 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
goto out_err;
trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
- ret = svc_rdma_post_chunk_ctxt(cc);
+ ret = svc_rdma_post_chunk_ctxt(rdma, cc);
if (ret < 0)
goto out_err;
@@ -693,7 +671,8 @@ out_err:
/**
* svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
- * @info: context for ongoing I/O
+ * @rqstp: RPC transaction context
+ * @head: context for ongoing I/O
* @segment: co-ordinates of remote memory to be read
*
* Returns:
@@ -702,20 +681,20 @@ out_err:
* %-ENOMEM: allocating a local resources failed
* %-EIO: a DMA mapping error occurred
*/
-static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
+static int svc_rdma_build_read_segment(struct svc_rqst *rqstp,
+ struct svc_rdma_recv_ctxt *head,
const struct svc_rdma_segment *segment)
{
- struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
- struct svc_rdma_chunk_ctxt *cc = &info->ri_cc;
- struct svc_rqst *rqstp = info->ri_rqst;
+ struct svcxprt_rdma *rdma = svc_rdma_rqst_rdma(rqstp);
+ struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
unsigned int sge_no, seg_len, len;
struct svc_rdma_rw_ctxt *ctxt;
struct scatterlist *sg;
int ret;
len = segment->rs_length;
- sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT;
- ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no);
+ sge_no = PAGE_ALIGN(head->rc_pageoff + len) >> PAGE_SHIFT;
+ ctxt = svc_rdma_get_rw_ctxt(rdma, sge_no);
if (!ctxt)
return -ENOMEM;
ctxt->rw_nents = sge_no;
@@ -723,29 +702,27 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
sg = ctxt->rw_sg_table.sgl;
for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) {
seg_len = min_t(unsigned int, len,
- PAGE_SIZE - info->ri_pageoff);
+ PAGE_SIZE - head->rc_pageoff);
- if (!info->ri_pageoff)
+ if (!head->rc_pageoff)
head->rc_page_count++;
- sg_set_page(sg, rqstp->rq_pages[info->ri_pageno],
- seg_len, info->ri_pageoff);
+ sg_set_page(sg, rqstp->rq_pages[head->rc_curpage],
+ seg_len, head->rc_pageoff);
sg = sg_next(sg);
- info->ri_pageoff += seg_len;
- if (info->ri_pageoff == PAGE_SIZE) {
- info->ri_pageno++;
- info->ri_pageoff = 0;
+ head->rc_pageoff += seg_len;
+ if (head->rc_pageoff == PAGE_SIZE) {
+ head->rc_curpage++;
+ head->rc_pageoff = 0;
}
len -= seg_len;
- /* Safety check */
- if (len &&
- &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end)
+ if (len && ((head->rc_curpage + 1) > ARRAY_SIZE(rqstp->rq_pages)))
goto out_overrun;
}
- ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, segment->rs_offset,
+ ret = svc_rdma_rw_ctx_init(rdma, ctxt, segment->rs_offset,
segment->rs_handle, DMA_FROM_DEVICE);
if (ret < 0)
return -EIO;
@@ -756,13 +733,14 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
return 0;
out_overrun:
- trace_svcrdma_page_overrun_err(cc->cc_rdma, rqstp, info->ri_pageno);
+ trace_svcrdma_page_overrun_err(&cc->cc_cid, head->rc_curpage);
return -EINVAL;
}
/**
* svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk
- * @info: context for ongoing I/O
+ * @rqstp: RPC transaction context
+ * @head: context for ongoing I/O
* @chunk: Read chunk to pull
*
* Return values:
@@ -771,7 +749,8 @@ out_overrun:
* %-ENOMEM: allocating a local resources failed
* %-EIO: a DMA mapping error occurred
*/
-static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info,
+static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
+ struct svc_rdma_recv_ctxt *head,
const struct svc_rdma_chunk *chunk)
{
const struct svc_rdma_segment *segment;
@@ -779,56 +758,56 @@ static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info,
ret = -EINVAL;
pcl_for_each_segment(segment, chunk) {
- ret = svc_rdma_build_read_segment(info, segment);
+ ret = svc_rdma_build_read_segment(rqstp, head, segment);
if (ret < 0)
break;
- info->ri_totalbytes += segment->rs_length;
+ head->rc_readbytes += segment->rs_length;
}
return ret;
}
/**
* svc_rdma_copy_inline_range - Copy part of the inline content into pages
- * @info: context for RDMA Reads
+ * @rqstp: RPC transaction context
+ * @head: context for ongoing I/O
* @offset: offset into the Receive buffer of region to copy
* @remaining: length of region to copy
*
* Take a page at a time from rqstp->rq_pages and copy the inline
* content from the Receive buffer into that page. Update
- * info->ri_pageno and info->ri_pageoff so that the next RDMA Read
+ * head->rc_curpage and head->rc_pageoff so that the next RDMA Read
* result will land contiguously with the copied content.
*
* Return values:
* %0: Inline content was successfully copied
* %-EINVAL: offset or length was incorrect
*/
-static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info,
+static int svc_rdma_copy_inline_range(struct svc_rqst *rqstp,
+ struct svc_rdma_recv_ctxt *head,
unsigned int offset,
unsigned int remaining)
{
- struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
unsigned char *dst, *src = head->rc_recv_buf;
- struct svc_rqst *rqstp = info->ri_rqst;
unsigned int page_no, numpages;
- numpages = PAGE_ALIGN(info->ri_pageoff + remaining) >> PAGE_SHIFT;
+ numpages = PAGE_ALIGN(head->rc_pageoff + remaining) >> PAGE_SHIFT;
for (page_no = 0; page_no < numpages; page_no++) {
unsigned int page_len;
page_len = min_t(unsigned int, remaining,
- PAGE_SIZE - info->ri_pageoff);
+ PAGE_SIZE - head->rc_pageoff);
- if (!info->ri_pageoff)
+ if (!head->rc_pageoff)
head->rc_page_count++;
- dst = page_address(rqstp->rq_pages[info->ri_pageno]);
- memcpy(dst + info->ri_pageno, src + offset, page_len);
+ dst = page_address(rqstp->rq_pages[head->rc_curpage]);
+ memcpy(dst + head->rc_curpage, src + offset, page_len);
- info->ri_totalbytes += page_len;
- info->ri_pageoff += page_len;
- if (info->ri_pageoff == PAGE_SIZE) {
- info->ri_pageno++;
- info->ri_pageoff = 0;
+ head->rc_readbytes += page_len;
+ head->rc_pageoff += page_len;
+ if (head->rc_pageoff == PAGE_SIZE) {
+ head->rc_curpage++;
+ head->rc_pageoff = 0;
}
remaining -= page_len;
offset += page_len;
@@ -839,7 +818,8 @@ static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info,
/**
* svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks
- * @info: context for RDMA Reads
+ * @rqstp: RPC transaction context
+ * @head: context for ongoing I/O
*
* The chunk data lands in rqstp->rq_arg as a series of contiguous pages,
* like an incoming TCP call.
@@ -851,11 +831,11 @@ static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info,
* %-ENOTCONN: posting failed (connection is lost),
* %-EIO: rdma_rw initialization failed (DMA mapping, etc).
*/
-static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *info)
+static noinline int
+svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
+ struct svc_rdma_recv_ctxt *head)
{
- struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
- struct xdr_buf *buf = &info->ri_rqst->rq_arg;
struct svc_rdma_chunk *chunk, *next;
unsigned int start, length;
int ret;
@@ -863,12 +843,12 @@ static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *inf
start = 0;
chunk = pcl_first_chunk(pcl);
length = chunk->ch_position;
- ret = svc_rdma_copy_inline_range(info, start, length);
+ ret = svc_rdma_copy_inline_range(rqstp, head, start, length);
if (ret < 0)
return ret;
pcl_for_each_chunk(chunk, pcl) {
- ret = svc_rdma_build_read_chunk(info, chunk);
+ ret = svc_rdma_build_read_chunk(rqstp, head, chunk);
if (ret < 0)
return ret;
@@ -877,31 +857,21 @@ static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *inf
break;
start += length;
- length = next->ch_position - info->ri_totalbytes;
- ret = svc_rdma_copy_inline_range(info, start, length);
+ length = next->ch_position - head->rc_readbytes;
+ ret = svc_rdma_copy_inline_range(rqstp, head, start, length);
if (ret < 0)
return ret;
}
start += length;
length = head->rc_byte_len - start;
- ret = svc_rdma_copy_inline_range(info, start, length);
- if (ret < 0)
- return ret;
-
- buf->len += info->ri_totalbytes;
- buf->buflen += info->ri_totalbytes;
-
- buf->head[0].iov_base = page_address(info->ri_rqst->rq_pages[0]);
- buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes);
- buf->pages = &info->ri_rqst->rq_pages[1];
- buf->page_len = info->ri_totalbytes - buf->head[0].iov_len;
- return 0;
+ return svc_rdma_copy_inline_range(rqstp, head, start, length);
}
/**
* svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks
- * @info: context for RDMA Reads
+ * @rqstp: RPC transaction context
+ * @head: context for ongoing I/O
*
* The chunk data lands in the page list of rqstp->rq_arg.pages.
*
@@ -916,50 +886,17 @@ static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *inf
* %-ENOTCONN: posting failed (connection is lost),
* %-EIO: rdma_rw initialization failed (DMA mapping, etc).
*/
-static int svc_rdma_read_data_item(struct svc_rdma_read_info *info)
+static int svc_rdma_read_data_item(struct svc_rqst *rqstp,
+ struct svc_rdma_recv_ctxt *head)
{
- struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
- struct xdr_buf *buf = &info->ri_rqst->rq_arg;
- struct svc_rdma_chunk *chunk;
- unsigned int length;
- int ret;
-
- chunk = pcl_first_chunk(&head->rc_read_pcl);
- ret = svc_rdma_build_read_chunk(info, chunk);
- if (ret < 0)
- goto out;
-
- /* Split the Receive buffer between the head and tail
- * buffers at Read chunk's position. XDR roundup of the
- * chunk is not included in either the pagelist or in
- * the tail.
- */
- buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position;
- buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position;
- buf->head[0].iov_len = chunk->ch_position;
-
- /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
- *
- * If the client already rounded up the chunk length, the
- * length does not change. Otherwise, the length of the page
- * list is increased to include XDR round-up.
- *
- * Currently these chunks always start at page offset 0,
- * thus the rounded-up length never crosses a page boundary.
- */
- buf->pages = &info->ri_rqst->rq_pages[0];
- length = xdr_align_size(chunk->ch_length);
- buf->page_len = length;
- buf->len += length;
- buf->buflen += length;
-
-out:
- return ret;
+ return svc_rdma_build_read_chunk(rqstp, head,
+ pcl_first_chunk(&head->rc_read_pcl));
}
/**
- * svc_rdma_read_chunk_range - Build RDMA Read WQEs for portion of a chunk
- * @info: context for RDMA Reads
+ * svc_rdma_read_chunk_range - Build RDMA Read WRs for portion of a chunk
+ * @rqstp: RPC transaction context
+ * @head: context for ongoing I/O
* @chunk: parsed Call chunk to pull
* @offset: offset of region to pull
* @length: length of region to pull
@@ -971,7 +908,8 @@ out:
* %-ENOTCONN: posting failed (connection is lost),
* %-EIO: rdma_rw initialization failed (DMA mapping, etc).
*/
-static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info,
+static int svc_rdma_read_chunk_range(struct svc_rqst *rqstp,
+ struct svc_rdma_recv_ctxt *head,
const struct svc_rdma_chunk *chunk,
unsigned int offset, unsigned int length)
{
@@ -991,11 +929,11 @@ static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info,
dummy.rs_length = min_t(u32, length, segment->rs_length) - offset;
dummy.rs_offset = segment->rs_offset + offset;
- ret = svc_rdma_build_read_segment(info, &dummy);
+ ret = svc_rdma_build_read_segment(rqstp, head, &dummy);
if (ret < 0)
break;
- info->ri_totalbytes += dummy.rs_length;
+ head->rc_readbytes += dummy.rs_length;
length -= dummy.rs_length;
offset = 0;
}
@@ -1004,7 +942,8 @@ static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info,
/**
* svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message
- * @info: context for RDMA Reads
+ * @rqstp: RPC transaction context
+ * @head: context for ongoing I/O
*
* Return values:
* %0: RDMA Read WQEs were successfully built
@@ -1013,9 +952,9 @@ static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info,
* %-ENOTCONN: posting failed (connection is lost),
* %-EIO: rdma_rw initialization failed (DMA mapping, etc).
*/
-static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info)
+static int svc_rdma_read_call_chunk(struct svc_rqst *rqstp,
+ struct svc_rdma_recv_ctxt *head)
{
- struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
const struct svc_rdma_chunk *call_chunk =
pcl_first_chunk(&head->rc_call_pcl);
const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
@@ -1024,17 +963,18 @@ static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info)
int ret;
if (pcl_is_empty(pcl))
- return svc_rdma_build_read_chunk(info, call_chunk);
+ return svc_rdma_build_read_chunk(rqstp, head, call_chunk);
start = 0;
chunk = pcl_first_chunk(pcl);
length = chunk->ch_position;
- ret = svc_rdma_read_chunk_range(info, call_chunk, start, length);
+ ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk,
+ start, length);
if (ret < 0)
return ret;
pcl_for_each_chunk(chunk, pcl) {
- ret = svc_rdma_build_read_chunk(info, chunk);
+ ret = svc_rdma_build_read_chunk(rqstp, head, chunk);
if (ret < 0)
return ret;
@@ -1043,8 +983,8 @@ static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info)
break;
start += length;
- length = next->ch_position - info->ri_totalbytes;
- ret = svc_rdma_read_chunk_range(info, call_chunk,
+ length = next->ch_position - head->rc_readbytes;
+ ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk,
start, length);
if (ret < 0)
return ret;
@@ -1052,12 +992,14 @@ static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info)
start += length;
length = call_chunk->ch_length - start;
- return svc_rdma_read_chunk_range(info, call_chunk, start, length);
+ return svc_rdma_read_chunk_range(rqstp, head, call_chunk,
+ start, length);
}
/**
* svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
- * @info: context for RDMA Reads
+ * @rqstp: RPC transaction context
+ * @head: context for ongoing I/O
*
* The start of the data lands in the first page just after the
* Transport header, and the rest lands in rqstp->rq_arg.pages.
@@ -1073,25 +1015,31 @@ static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info)
* %-ENOTCONN: posting failed (connection is lost),
* %-EIO: rdma_rw initialization failed (DMA mapping, etc).
*/
-static noinline int svc_rdma_read_special(struct svc_rdma_read_info *info)
+static noinline int svc_rdma_read_special(struct svc_rqst *rqstp,
+ struct svc_rdma_recv_ctxt *head)
{
- struct xdr_buf *buf = &info->ri_rqst->rq_arg;
- int ret;
-
- ret = svc_rdma_read_call_chunk(info);
- if (ret < 0)
- goto out;
-
- buf->len += info->ri_totalbytes;
- buf->buflen += info->ri_totalbytes;
+ return svc_rdma_read_call_chunk(rqstp, head);
+}
- buf->head[0].iov_base = page_address(info->ri_rqst->rq_pages[0]);
- buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes);
- buf->pages = &info->ri_rqst->rq_pages[1];
- buf->page_len = info->ri_totalbytes - buf->head[0].iov_len;
+/* Pages under I/O have been copied to head->rc_pages. Ensure that
+ * svc_xprt_release() does not put them when svc_rdma_recvfrom()
+ * returns. This has to be done after all Read WRs are constructed
+ * to properly handle a page that happens to be part of I/O on behalf
+ * of two different RDMA segments.
+ *
+ * Note: if the subsequent post_send fails, these pages have already
+ * been moved to head->rc_pages and thus will be cleaned up by
+ * svc_rdma_recv_ctxt_put().
+ */
+static void svc_rdma_clear_rqst_pages(struct svc_rqst *rqstp,
+ struct svc_rdma_recv_ctxt *head)
+{
+ unsigned int i;
-out:
- return ret;
+ for (i = 0; i < head->rc_page_count; i++) {
+ head->rc_pages[i] = rqstp->rq_pages[i];
+ rqstp->rq_pages[i] = NULL;
+ }
}
/**
@@ -1121,49 +1069,27 @@ int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
struct svc_rqst *rqstp,
struct svc_rdma_recv_ctxt *head)
{
- struct svc_rdma_read_info *info;
- struct svc_rdma_chunk_ctxt *cc;
+ struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
int ret;
- info = svc_rdma_read_info_alloc(rdma);
- if (!info)
- return -ENOMEM;
- cc = &info->ri_cc;
- info->ri_rqst = rqstp;
- info->ri_readctxt = head;
- info->ri_pageno = 0;
- info->ri_pageoff = 0;
- info->ri_totalbytes = 0;
+ cc->cc_cqe.done = svc_rdma_wc_read_done;
+ cc->cc_sqecount = 0;
+ head->rc_pageoff = 0;
+ head->rc_curpage = 0;
+ head->rc_readbytes = 0;
if (pcl_is_empty(&head->rc_call_pcl)) {
if (head->rc_read_pcl.cl_count == 1)
- ret = svc_rdma_read_data_item(info);
+ ret = svc_rdma_read_data_item(rqstp, head);
else
- ret = svc_rdma_read_multiple_chunks(info);
+ ret = svc_rdma_read_multiple_chunks(rqstp, head);
} else
- ret = svc_rdma_read_special(info);
+ ret = svc_rdma_read_special(rqstp, head);
+ svc_rdma_clear_rqst_pages(rqstp, head);
if (ret < 0)
- goto out_err;
+ return ret;
trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount);
- init_completion(&cc->cc_done);
- ret = svc_rdma_post_chunk_ctxt(cc);
- if (ret < 0)
- goto out_err;
-
- ret = 1;
- wait_for_completion(&cc->cc_done);
- if (cc->cc_status != IB_WC_SUCCESS)
- ret = -EIO;
-
- /* rq_respages starts after the last arg page */
- rqstp->rq_respages = &rqstp->rq_pages[head->rc_page_count];
- rqstp->rq_next_page = rqstp->rq_respages + 1;
-
- /* Ensure svc_rdma_recv_ctxt_put() does not try to release pages */
- head->rc_page_count = 0;
-
-out_err:
- svc_rdma_read_info_free(info);
- return ret;
+ ret = svc_rdma_post_chunk_ctxt(rdma, cc);
+ return ret < 0 ? ret : 1;
}