From 08b8a0440eeec83f8330349f829908858fd52d31 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Mon, 14 Mar 2022 15:47:18 -0400 Subject: libceph: add spinlock around osd->o_requests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In a later patch, we're going to need to search for a request in the rbtree, but taking the o_mutex is inconvenient as we already hold the con mutex at the point where we need it. Add a new spinlock that we take when inserting and erasing entries from the o_requests tree. Search of the rbtree can be done with either the mutex or the spinlock, but insertion and removal requires both. Signed-off-by: Jeff Layton Reviewed-by: Xiubo Li Reviewed-and-tested-by: Luís Henriques Reviewed-by: Milind Changire Signed-off-by: Ilya Dryomov --- include/linux/ceph/osd_client.h | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'include') diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index fb6be72104df..92addef18738 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -29,7 +29,12 @@ typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *); #define CEPH_HOMELESS_OSD -1 -/* a given osd we're communicating with */ +/* + * A given osd we're communicating with. + * + * Note that the o_requests tree can be searched while holding the "lock" mutex + * or the "o_requests_lock" spinlock. Insertion or removal requires both! + */ struct ceph_osd { refcount_t o_ref; struct ceph_osd_client *o_osdc; @@ -37,6 +42,7 @@ struct ceph_osd { int o_incarnation; struct rb_node o_node; struct ceph_connection o_con; + spinlock_t o_requests_lock; struct rb_root o_requests; struct rb_root o_linger_requests; struct rb_root o_backoff_mappings; -- cgit v1.2.3 From a679e50f728648f7b2f3b349e082448abd388038 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Wed, 16 Mar 2022 15:23:00 -0400 Subject: libceph: define struct ceph_sparse_extent and add some helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the OSD sends back a sparse read reply, it contains an array of these structures. Define the structure and add a couple of helpers for dealing with them. Also add a place in struct ceph_osd_req_op to store the extent buffer, and code to free it if it's populated when the req is torn down. Signed-off-by: Jeff Layton Reviewed-by: Xiubo Li Reviewed-and-tested-by: Luís Henriques Reviewed-by: Milind Changire Signed-off-by: Ilya Dryomov --- include/linux/ceph/osd_client.h | 43 ++++++++++++++++++++++++++++++++++++++++- net/ceph/osd_client.c | 13 +++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) (limited to 'include') diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 92addef18738..05da1e755b7b 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -29,6 +29,17 @@ typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *); #define CEPH_HOMELESS_OSD -1 +/* + * A single extent in a SPARSE_READ reply. + * + * Note that these come from the OSD as little-endian values. On BE arches, + * we convert them in-place after receipt. + */ +struct ceph_sparse_extent { + u64 off; + u64 len; +} __packed; + /* * A given osd we're communicating with. * @@ -104,6 +115,8 @@ struct ceph_osd_req_op { u64 offset, length; u64 truncate_size; u32 truncate_seq; + int sparse_ext_cnt; + struct ceph_sparse_extent *sparse_ext; struct ceph_osd_data osd_data; } extent; struct { @@ -510,6 +523,20 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, u32 truncate_seq, u64 truncate_size, bool use_mempool); +int __ceph_alloc_sparse_ext_map(struct ceph_osd_req_op *op, int cnt); + +/* + * How big an extent array should we preallocate for a sparse read? This is + * just a starting value. If we get more than this back from the OSD, the + * receiver will reallocate. + */ +#define CEPH_SPARSE_EXT_ARRAY_INITIAL 16 + +static inline int ceph_alloc_sparse_ext_map(struct ceph_osd_req_op *op) +{ + return __ceph_alloc_sparse_ext_map(op, CEPH_SPARSE_EXT_ARRAY_INITIAL); +} + extern void ceph_osdc_get_request(struct ceph_osd_request *req); extern void ceph_osdc_put_request(struct ceph_osd_request *req); @@ -564,5 +591,19 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, struct ceph_object_locator *oloc, struct ceph_watch_item **watchers, u32 *num_watchers); -#endif +/* Find offset into the buffer of the end of the extent map */ +static inline u64 ceph_sparse_ext_map_end(struct ceph_osd_req_op *op) +{ + struct ceph_sparse_extent *ext; + + /* No extents? No data */ + if (op->extent.sparse_ext_cnt == 0) + return 0; + + ext = &op->extent.sparse_ext[op->extent.sparse_ext_cnt - 1]; + + return ext->off + ext->len - op->extent.offset; +} + +#endif diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 6d7a430bfed8..3e03ae68722c 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -378,6 +378,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: + kfree(op->extent.sparse_ext); ceph_osd_data_release(&op->extent.osd_data); break; case CEPH_OSD_OP_CALL: @@ -1120,6 +1121,18 @@ fail: } EXPORT_SYMBOL(ceph_osdc_new_request); +int __ceph_alloc_sparse_ext_map(struct ceph_osd_req_op *op, int cnt) +{ + op->extent.sparse_ext_cnt = cnt; + op->extent.sparse_ext = kmalloc_array(cnt, + sizeof(*op->extent.sparse_ext), + GFP_NOFS); + if (!op->extent.sparse_ext) + return -ENOMEM; + return 0; +} +EXPORT_SYMBOL(__ceph_alloc_sparse_ext_map); + /* * We keep osd requests in an rbtree, sorted by ->r_tid. */ -- cgit v1.2.3 From ec3bc567eac12c557a2b99bd0b34b5dff12cab23 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Tue, 25 Jan 2022 08:26:31 -0500 Subject: libceph: new sparse_read op, support sparse reads on msgr2 crc codepath MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add support for a new sparse_read ceph_connection operation. The idea is that the client driver can define this operation use it to do special handling for incoming reads. The alloc_msg routine will look at the request and determine whether the reply is expected to be sparse. If it is, then we'll dispatch to a different set of state machine states that will repeatedly call the driver's sparse_read op to get length and placement info for reading the extent map, and the extents themselves. This necessitates adding some new field to some other structs: - The msg gets a new bool to track whether it's a sparse_read request. - A new field is added to the cursor to track the amount remaining in the current extent. This is used to cap the read from the socket into the msg_data - Handing a revoke with all of this is particularly difficult, so I've added a new data_len_remain field to the v2 connection info, and then use that to skip that much on a revoke. We may want to expand the use of that to the normal read path as well, just for consistency's sake. Signed-off-by: Jeff Layton Reviewed-by: Xiubo Li Reviewed-and-tested-by: Luís Henriques Reviewed-by: Milind Changire Signed-off-by: Ilya Dryomov --- include/linux/ceph/messenger.h | 28 +++++++ net/ceph/messenger.c | 1 + net/ceph/messenger_v2.c | 167 ++++++++++++++++++++++++++++++++++++++--- 3 files changed, 187 insertions(+), 9 deletions(-) (limited to 'include') diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 99c1726be6ee..8a6938fa324e 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -17,6 +17,7 @@ struct ceph_msg; struct ceph_connection; +struct ceph_msg_data_cursor; /* * Ceph defines these callbacks for handling connection events. @@ -70,6 +71,30 @@ struct ceph_connection_operations { int used_proto, int result, const int *allowed_protos, int proto_cnt, const int *allowed_modes, int mode_cnt); + + /** + * sparse_read: read sparse data + * @con: connection we're reading from + * @cursor: data cursor for reading extents + * @buf: optional buffer to read into + * + * This should be called more than once, each time setting up to + * receive an extent into the current cursor position, and zeroing + * the holes between them. + * + * Returns amount of data to be read (in bytes), 0 if reading is + * complete, or -errno if there was an error. + * + * If @buf is set on a >0 return, then the data should be read into + * the provided buffer. Otherwise, it should be read into the cursor. + * + * The sparse read operation is expected to initialize the cursor + * with a length covering up to the end of the last extent. + */ + int (*sparse_read)(struct ceph_connection *con, + struct ceph_msg_data_cursor *cursor, + char **buf); + }; /* use format string %s%lld */ @@ -207,6 +232,7 @@ struct ceph_msg_data_cursor { struct ceph_msg_data *data; /* current data item */ size_t resid; /* bytes not yet consumed */ + int sr_resid; /* residual sparse_read len */ bool need_crc; /* crc update needed */ union { #ifdef CONFIG_BLOCK @@ -251,6 +277,7 @@ struct ceph_msg { struct kref kref; bool more_to_follow; bool needs_out_seq; + bool sparse_read; int front_alloc_len; struct ceph_msgpool *pool; @@ -395,6 +422,7 @@ struct ceph_connection_v2_info { void *conn_bufs[16]; int conn_buf_cnt; + int data_len_remain; struct kvec in_sign_kvecs[8]; struct kvec out_sign_kvecs[8]; diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 5eb4898cccd4..2eb10d7518e8 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1013,6 +1013,7 @@ void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, cursor->total_resid = length; cursor->data = msg->data; + cursor->sr_resid = 0; __ceph_msg_data_cursor_init(cursor); } diff --git a/net/ceph/messenger_v2.c b/net/ceph/messenger_v2.c index 1df1d29dee92..17c9a858bfbd 100644 --- a/net/ceph/messenger_v2.c +++ b/net/ceph/messenger_v2.c @@ -52,14 +52,16 @@ #define FRAME_LATE_STATUS_COMPLETE 0xe #define FRAME_LATE_STATUS_ABORTED_MASK 0xf -#define IN_S_HANDLE_PREAMBLE 1 -#define IN_S_HANDLE_CONTROL 2 -#define IN_S_HANDLE_CONTROL_REMAINDER 3 -#define IN_S_PREPARE_READ_DATA 4 -#define IN_S_PREPARE_READ_DATA_CONT 5 -#define IN_S_PREPARE_READ_ENC_PAGE 6 -#define IN_S_HANDLE_EPILOGUE 7 -#define IN_S_FINISH_SKIP 8 +#define IN_S_HANDLE_PREAMBLE 1 +#define IN_S_HANDLE_CONTROL 2 +#define IN_S_HANDLE_CONTROL_REMAINDER 3 +#define IN_S_PREPARE_READ_DATA 4 +#define IN_S_PREPARE_READ_DATA_CONT 5 +#define IN_S_PREPARE_READ_ENC_PAGE 6 +#define IN_S_PREPARE_SPARSE_DATA 7 +#define IN_S_PREPARE_SPARSE_DATA_CONT 8 +#define IN_S_HANDLE_EPILOGUE 9 +#define IN_S_FINISH_SKIP 10 #define OUT_S_QUEUE_DATA 1 #define OUT_S_QUEUE_DATA_CONT 2 @@ -1825,6 +1827,123 @@ static void prepare_read_data_cont(struct ceph_connection *con) con->v2.in_state = IN_S_HANDLE_EPILOGUE; } +static int prepare_sparse_read_cont(struct ceph_connection *con) +{ + int ret; + struct bio_vec bv; + char *buf = NULL; + struct ceph_msg_data_cursor *cursor = &con->v2.in_cursor; + + WARN_ON(con->v2.in_state != IN_S_PREPARE_SPARSE_DATA_CONT); + + if (iov_iter_is_bvec(&con->v2.in_iter)) { + if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) { + con->in_data_crc = crc32c(con->in_data_crc, + page_address(con->bounce_page), + con->v2.in_bvec.bv_len); + get_bvec_at(cursor, &bv); + memcpy_to_page(bv.bv_page, bv.bv_offset, + page_address(con->bounce_page), + con->v2.in_bvec.bv_len); + } else { + con->in_data_crc = ceph_crc32c_page(con->in_data_crc, + con->v2.in_bvec.bv_page, + con->v2.in_bvec.bv_offset, + con->v2.in_bvec.bv_len); + } + + ceph_msg_data_advance(cursor, con->v2.in_bvec.bv_len); + cursor->sr_resid -= con->v2.in_bvec.bv_len; + dout("%s: advance by 0x%x sr_resid 0x%x\n", __func__, + con->v2.in_bvec.bv_len, cursor->sr_resid); + WARN_ON_ONCE(cursor->sr_resid > cursor->total_resid); + if (cursor->sr_resid) { + get_bvec_at(cursor, &bv); + if (bv.bv_len > cursor->sr_resid) + bv.bv_len = cursor->sr_resid; + if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) { + bv.bv_page = con->bounce_page; + bv.bv_offset = 0; + } + set_in_bvec(con, &bv); + con->v2.data_len_remain -= bv.bv_len; + return 0; + } + } else if (iov_iter_is_kvec(&con->v2.in_iter)) { + /* On first call, we have no kvec so don't compute crc */ + if (con->v2.in_kvec_cnt) { + WARN_ON_ONCE(con->v2.in_kvec_cnt > 1); + con->in_data_crc = crc32c(con->in_data_crc, + con->v2.in_kvecs[0].iov_base, + con->v2.in_kvecs[0].iov_len); + } + } else { + return -EIO; + } + + /* get next extent */ + ret = con->ops->sparse_read(con, cursor, &buf); + if (ret <= 0) { + if (ret < 0) + return ret; + + reset_in_kvecs(con); + add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN); + con->v2.in_state = IN_S_HANDLE_EPILOGUE; + return 0; + } + + if (buf) { + /* receive into buffer */ + reset_in_kvecs(con); + add_in_kvec(con, buf, ret); + con->v2.data_len_remain -= ret; + return 0; + } + + if (ret > cursor->total_resid) { + pr_warn("%s: ret 0x%x total_resid 0x%zx resid 0x%zx\n", + __func__, ret, cursor->total_resid, cursor->resid); + return -EIO; + } + get_bvec_at(cursor, &bv); + if (bv.bv_len > cursor->sr_resid) + bv.bv_len = cursor->sr_resid; + if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) { + if (unlikely(!con->bounce_page)) { + con->bounce_page = alloc_page(GFP_NOIO); + if (!con->bounce_page) { + pr_err("failed to allocate bounce page\n"); + return -ENOMEM; + } + } + + bv.bv_page = con->bounce_page; + bv.bv_offset = 0; + } + set_in_bvec(con, &bv); + con->v2.data_len_remain -= ret; + return ret; +} + +static int prepare_sparse_read_data(struct ceph_connection *con) +{ + struct ceph_msg *msg = con->in_msg; + + dout("%s: starting sparse read\n", __func__); + + if (WARN_ON_ONCE(!con->ops->sparse_read)) + return -EOPNOTSUPP; + + if (!con_secure(con)) + con->in_data_crc = -1; + + reset_in_kvecs(con); + con->v2.in_state = IN_S_PREPARE_SPARSE_DATA_CONT; + con->v2.data_len_remain = data_len(msg); + return prepare_sparse_read_cont(con); +} + static int prepare_read_tail_plain(struct ceph_connection *con) { struct ceph_msg *msg = con->in_msg; @@ -1845,7 +1964,10 @@ static int prepare_read_tail_plain(struct ceph_connection *con) } if (data_len(msg)) { - con->v2.in_state = IN_S_PREPARE_READ_DATA; + if (msg->sparse_read) + con->v2.in_state = IN_S_PREPARE_SPARSE_DATA; + else + con->v2.in_state = IN_S_PREPARE_READ_DATA; } else { add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN); con->v2.in_state = IN_S_HANDLE_EPILOGUE; @@ -2898,6 +3020,12 @@ static int populate_in_iter(struct ceph_connection *con) prepare_read_enc_page(con); ret = 0; break; + case IN_S_PREPARE_SPARSE_DATA: + ret = prepare_sparse_read_data(con); + break; + case IN_S_PREPARE_SPARSE_DATA_CONT: + ret = prepare_sparse_read_cont(con); + break; case IN_S_HANDLE_EPILOGUE: ret = handle_epilogue(con); break; @@ -3489,6 +3617,23 @@ static void revoke_at_prepare_read_enc_page(struct ceph_connection *con) con->v2.in_state = IN_S_FINISH_SKIP; } +static void revoke_at_prepare_sparse_data(struct ceph_connection *con) +{ + int resid; /* current piece of data */ + int remaining; + + WARN_ON(con_secure(con)); + WARN_ON(!data_len(con->in_msg)); + WARN_ON(!iov_iter_is_bvec(&con->v2.in_iter)); + resid = iov_iter_count(&con->v2.in_iter); + dout("%s con %p resid %d\n", __func__, con, resid); + + remaining = CEPH_EPILOGUE_PLAIN_LEN + con->v2.data_len_remain; + con->v2.in_iter.count -= resid; + set_in_skip(con, resid + remaining); + con->v2.in_state = IN_S_FINISH_SKIP; +} + static void revoke_at_handle_epilogue(struct ceph_connection *con) { int resid; @@ -3505,6 +3650,7 @@ static void revoke_at_handle_epilogue(struct ceph_connection *con) void ceph_con_v2_revoke_incoming(struct ceph_connection *con) { switch (con->v2.in_state) { + case IN_S_PREPARE_SPARSE_DATA: case IN_S_PREPARE_READ_DATA: revoke_at_prepare_read_data(con); break; @@ -3514,6 +3660,9 @@ void ceph_con_v2_revoke_incoming(struct ceph_connection *con) case IN_S_PREPARE_READ_ENC_PAGE: revoke_at_prepare_read_enc_page(con); break; + case IN_S_PREPARE_SPARSE_DATA_CONT: + revoke_at_prepare_sparse_data(con); + break; case IN_S_HANDLE_EPILOGUE: revoke_at_handle_epilogue(con); break; -- cgit v1.2.3 From d396f89db39a2f259e2125ca43b4c31bb65afcad Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Thu, 24 Mar 2022 13:33:06 -0400 Subject: libceph: add sparse read support to msgr1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add 2 new fields to ceph_connection_v1_info to track the necessary info in sparse reads. Skip initializing the cursor for a sparse read. Break out read_partial_message_section into a wrapper around a new read_partial_message_chunk function that doesn't zero out the crc first. Add new helper functions to drive receiving into the destinations provided by the sparse_read state machine. Signed-off-by: Jeff Layton Reviewed-by: Xiubo Li Reviewed-and-tested-by: Luís Henriques Reviewed-by: Milind Changire Signed-off-by: Ilya Dryomov --- include/linux/ceph/messenger.h | 4 ++ net/ceph/messenger_v1.c | 98 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 94 insertions(+), 8 deletions(-) (limited to 'include') diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 8a6938fa324e..9fd7255172ad 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -336,6 +336,10 @@ struct ceph_connection_v1_info { int in_base_pos; /* bytes read */ + /* sparse reads */ + struct kvec in_sr_kvec; /* current location to receive into */ + u64 in_sr_len; /* amount of data in this extent */ + /* message in temps */ u8 in_tag; /* protocol control byte */ struct ceph_msg_header in_hdr; diff --git a/net/ceph/messenger_v1.c b/net/ceph/messenger_v1.c index 3d57bb48a2b4..f9a50d7f0d20 100644 --- a/net/ceph/messenger_v1.c +++ b/net/ceph/messenger_v1.c @@ -159,9 +159,9 @@ static size_t sizeof_footer(struct ceph_connection *con) static void prepare_message_data(struct ceph_msg *msg, u32 data_len) { - /* Initialize data cursor */ - - ceph_msg_data_cursor_init(&msg->cursor, msg, data_len); + /* Initialize data cursor if it's not a sparse read */ + if (!msg->sparse_read) + ceph_msg_data_cursor_init(&msg->cursor, msg, data_len); } /* @@ -960,9 +960,9 @@ static void process_ack(struct ceph_connection *con) prepare_read_tag(con); } -static int read_partial_message_section(struct ceph_connection *con, - struct kvec *section, - unsigned int sec_len, u32 *crc) +static int read_partial_message_chunk(struct ceph_connection *con, + struct kvec *section, + unsigned int sec_len, u32 *crc) { int ret, left; @@ -978,11 +978,91 @@ static int read_partial_message_section(struct ceph_connection *con, section->iov_len += ret; } if (section->iov_len == sec_len) - *crc = crc32c(0, section->iov_base, section->iov_len); + *crc = crc32c(*crc, section->iov_base, section->iov_len); return 1; } +static inline int read_partial_message_section(struct ceph_connection *con, + struct kvec *section, + unsigned int sec_len, u32 *crc) +{ + *crc = 0; + return read_partial_message_chunk(con, section, sec_len, crc); +} + +static int read_sparse_msg_extent(struct ceph_connection *con, u32 *crc) +{ + struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor; + bool do_bounce = ceph_test_opt(from_msgr(con->msgr), RXBOUNCE); + + if (do_bounce && unlikely(!con->bounce_page)) { + con->bounce_page = alloc_page(GFP_NOIO); + if (!con->bounce_page) { + pr_err("failed to allocate bounce page\n"); + return -ENOMEM; + } + } + + while (cursor->sr_resid > 0) { + struct page *page, *rpage; + size_t off, len; + int ret; + + page = ceph_msg_data_next(cursor, &off, &len); + rpage = do_bounce ? con->bounce_page : page; + + /* clamp to what remains in extent */ + len = min_t(int, len, cursor->sr_resid); + ret = ceph_tcp_recvpage(con->sock, rpage, (int)off, len); + if (ret <= 0) + return ret; + *crc = ceph_crc32c_page(*crc, rpage, off, ret); + ceph_msg_data_advance(cursor, (size_t)ret); + cursor->sr_resid -= ret; + if (do_bounce) + memcpy_page(page, off, rpage, off, ret); + } + return 1; +} + +static int read_sparse_msg_data(struct ceph_connection *con) +{ + struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor; + bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); + u32 crc = 0; + int ret = 1; + + if (do_datacrc) + crc = con->in_data_crc; + + do { + if (con->v1.in_sr_kvec.iov_base) + ret = read_partial_message_chunk(con, + &con->v1.in_sr_kvec, + con->v1.in_sr_len, + &crc); + else if (cursor->sr_resid > 0) + ret = read_sparse_msg_extent(con, &crc); + + if (ret <= 0) { + if (do_datacrc) + con->in_data_crc = crc; + return ret; + } + + memset(&con->v1.in_sr_kvec, 0, sizeof(con->v1.in_sr_kvec)); + ret = con->ops->sparse_read(con, cursor, + (char **)&con->v1.in_sr_kvec.iov_base); + con->v1.in_sr_len = ret; + } while (ret > 0); + + if (do_datacrc) + con->in_data_crc = crc; + + return ret < 0 ? ret : 1; /* must return > 0 to indicate success */ +} + static int read_partial_msg_data(struct ceph_connection *con) { struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor; @@ -1173,7 +1253,9 @@ static int read_partial_message(struct ceph_connection *con) if (!m->num_data_items) return -EIO; - if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) + if (m->sparse_read) + ret = read_sparse_msg_data(con); + else if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) ret = read_partial_msg_data_bounce(con); else ret = read_partial_msg_data(con); -- cgit v1.2.3 From f628d799972799023d32c2542bb2639eb8c4f84e Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Fri, 11 Feb 2022 11:38:02 -0500 Subject: libceph: add sparse read support to OSD client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Have get_reply check for the presence of sparse read ops in the request and set the sparse_read boolean in the msg. That will queue the messenger layer to use the sparse read codepath instead of the normal data receive. Add a new sparse_read operation for the OSD client, driven by its own state machine. The messenger will repeatedly call the sparse_read operation, and it will pass back the necessary info to set up to read the next extent of data, while zero-filling the sparse regions. The state machine will stop at the end of the last extent, and will attach the extent map buffer to the ceph_osd_req_op so that the caller can use it. Signed-off-by: Jeff Layton Reviewed-by: Xiubo Li Reviewed-and-tested-by: Luís Henriques Reviewed-by: Milind Changire Signed-off-by: Ilya Dryomov --- include/linux/ceph/osd_client.h | 32 +++++ net/ceph/osd_client.c | 257 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 285 insertions(+), 4 deletions(-) (limited to 'include') diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 05da1e755b7b..bfa4813590da 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -40,6 +40,36 @@ struct ceph_sparse_extent { u64 len; } __packed; +/* Sparse read state machine state values */ +enum ceph_sparse_read_state { + CEPH_SPARSE_READ_HDR = 0, + CEPH_SPARSE_READ_EXTENTS, + CEPH_SPARSE_READ_DATA_LEN, + CEPH_SPARSE_READ_DATA, +}; + +/* + * A SPARSE_READ reply is a 32-bit count of extents, followed by an array of + * 64-bit offset/length pairs, and then all of the actual file data + * concatenated after it (sans holes). + * + * Unfortunately, we don't know how long the extent array is until we've + * started reading the data section of the reply. The caller should send down + * a destination buffer for the array, but we'll alloc one if it's too small + * or if the caller doesn't. + */ +struct ceph_sparse_read { + enum ceph_sparse_read_state sr_state; /* state machine state */ + u64 sr_req_off; /* orig request offset */ + u64 sr_req_len; /* orig request length */ + u64 sr_pos; /* current pos in buffer */ + int sr_index; /* current extent index */ + __le32 sr_datalen; /* length of actual data */ + u32 sr_count; /* extent count in reply */ + int sr_ext_len; /* length of extent array */ + struct ceph_sparse_extent *sr_extent; /* extent array */ +}; + /* * A given osd we're communicating with. * @@ -48,6 +78,7 @@ struct ceph_sparse_extent { */ struct ceph_osd { refcount_t o_ref; + int o_sparse_op_idx; struct ceph_osd_client *o_osdc; int o_osd; int o_incarnation; @@ -63,6 +94,7 @@ struct ceph_osd { unsigned long lru_ttl; struct list_head o_keepalive_item; struct mutex lock; + struct ceph_sparse_read o_sparse_read; }; #define CEPH_OSD_SLAB_OPS 2 diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 3e03ae68722c..0aacbadcab06 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -376,6 +376,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, switch (op->op) { case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_SPARSE_READ: case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: kfree(op->extent.sparse_ext); @@ -670,6 +671,7 @@ static void get_num_data_items(struct ceph_osd_request *req, /* reply */ case CEPH_OSD_OP_STAT: case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_SPARSE_READ: case CEPH_OSD_OP_LIST_WATCHERS: *num_reply_data_items += 1; break; @@ -739,7 +741,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req, BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO && - opcode != CEPH_OSD_OP_TRUNCATE); + opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_SPARSE_READ); op->extent.offset = offset; op->extent.length = length; @@ -964,6 +966,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst, case CEPH_OSD_OP_STAT: break; case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_SPARSE_READ: case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: case CEPH_OSD_OP_ZERO: @@ -1060,7 +1063,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE && - opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE); + opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE && + opcode != CEPH_OSD_OP_SPARSE_READ); req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, GFP_NOFS); @@ -1201,6 +1205,13 @@ static void osd_init(struct ceph_osd *osd) mutex_init(&osd->lock); } +static void ceph_init_sparse_read(struct ceph_sparse_read *sr) +{ + kfree(sr->sr_extent); + memset(sr, '\0', sizeof(*sr)); + sr->sr_state = CEPH_SPARSE_READ_HDR; +} + static void osd_cleanup(struct ceph_osd *osd) { WARN_ON(!RB_EMPTY_NODE(&osd->o_node)); @@ -1211,6 +1222,8 @@ static void osd_cleanup(struct ceph_osd *osd) WARN_ON(!list_empty(&osd->o_osd_lru)); WARN_ON(!list_empty(&osd->o_keepalive_item)); + ceph_init_sparse_read(&osd->o_sparse_read); + if (osd->o_auth.authorizer) { WARN_ON(osd_homeless(osd)); ceph_auth_destroy_authorizer(osd->o_auth.authorizer); @@ -1230,6 +1243,9 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) osd_init(osd); osd->o_osdc = osdc; osd->o_osd = onum; + osd->o_sparse_op_idx = -1; + + ceph_init_sparse_read(&osd->o_sparse_read); ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); @@ -2034,6 +2050,7 @@ static void setup_request_data(struct ceph_osd_request *req) &op->raw_data_in); break; case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_SPARSE_READ: ceph_osdc_msg_data_add(reply_msg, &op->extent.osd_data); break; @@ -2453,8 +2470,10 @@ static void finish_request(struct ceph_osd_request *req) req->r_end_latency = ktime_get(); - if (req->r_osd) + if (req->r_osd) { + ceph_init_sparse_read(&req->r_osd->o_sparse_read); unlink_request(req->r_osd, req); + } atomic_dec(&osdc->num_requests); /* @@ -5366,6 +5385,24 @@ static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg) ceph_msg_put(msg); } +/* How much sparse data was requested? */ +static u64 sparse_data_requested(struct ceph_osd_request *req) +{ + u64 len = 0; + + if (req->r_flags & CEPH_OSD_FLAG_READ) { + int i; + + for (i = 0; i < req->r_num_ops; ++i) { + struct ceph_osd_req_op *op = &req->r_ops[i]; + + if (op->op == CEPH_OSD_OP_SPARSE_READ) + len += op->extent.length; + } + } + return len; +} + /* * Lookup and return message for incoming reply. Don't try to do * anything about a larger than preallocated data portion of the @@ -5382,6 +5419,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, int front_len = le32_to_cpu(hdr->front_len); int data_len = le32_to_cpu(hdr->data_len); u64 tid = le64_to_cpu(hdr->tid); + u64 srlen; down_read(&osdc->lock); if (!osd_registered(osd)) { @@ -5414,7 +5452,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, req->r_reply = m; } - if (data_len > req->r_reply->data_length) { + srlen = sparse_data_requested(req); + if (!srlen && data_len > req->r_reply->data_length) { pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n", __func__, osd->o_osd, req->r_tid, data_len, req->r_reply->data_length); @@ -5424,6 +5463,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, } m = ceph_msg_get(req->r_reply); + m->sparse_read = (bool)srlen; + dout("get_reply tid %lld %p\n", tid, m); out_unlock_session: @@ -5656,9 +5697,217 @@ static int osd_check_message_signature(struct ceph_msg *msg) return ceph_auth_check_message_signature(auth, msg); } +static void advance_cursor(struct ceph_msg_data_cursor *cursor, size_t len, + bool zero) +{ + while (len) { + struct page *page; + size_t poff, plen; + + page = ceph_msg_data_next(cursor, &poff, &plen); + if (plen > len) + plen = len; + if (zero) + zero_user_segment(page, poff, poff + plen); + len -= plen; + ceph_msg_data_advance(cursor, plen); + } +} + +static int prep_next_sparse_read(struct ceph_connection *con, + struct ceph_msg_data_cursor *cursor) +{ + struct ceph_osd *o = con->private; + struct ceph_sparse_read *sr = &o->o_sparse_read; + struct ceph_osd_request *req; + struct ceph_osd_req_op *op; + + spin_lock(&o->o_requests_lock); + req = lookup_request(&o->o_requests, le64_to_cpu(con->in_msg->hdr.tid)); + if (!req) { + spin_unlock(&o->o_requests_lock); + return -EBADR; + } + + if (o->o_sparse_op_idx < 0) { + u64 srlen = sparse_data_requested(req); + + dout("%s: [%d] starting new sparse read req. srlen=0x%llx\n", + __func__, o->o_osd, srlen); + ceph_msg_data_cursor_init(cursor, con->in_msg, srlen); + } else { + u64 end; + + op = &req->r_ops[o->o_sparse_op_idx]; + + WARN_ON_ONCE(op->extent.sparse_ext); + + /* hand back buffer we took earlier */ + op->extent.sparse_ext = sr->sr_extent; + sr->sr_extent = NULL; + op->extent.sparse_ext_cnt = sr->sr_count; + sr->sr_ext_len = 0; + dout("%s: [%d] completed extent array len %d cursor->resid %zd\n", + __func__, o->o_osd, op->extent.sparse_ext_cnt, cursor->resid); + /* Advance to end of data for this operation */ + end = ceph_sparse_ext_map_end(op); + if (end < sr->sr_req_len) + advance_cursor(cursor, sr->sr_req_len - end, false); + } + + ceph_init_sparse_read(sr); + + /* find next op in this request (if any) */ + while (++o->o_sparse_op_idx < req->r_num_ops) { + op = &req->r_ops[o->o_sparse_op_idx]; + if (op->op == CEPH_OSD_OP_SPARSE_READ) + goto found; + } + + /* reset for next sparse read request */ + spin_unlock(&o->o_requests_lock); + o->o_sparse_op_idx = -1; + return 0; +found: + sr->sr_req_off = op->extent.offset; + sr->sr_req_len = op->extent.length; + sr->sr_pos = sr->sr_req_off; + dout("%s: [%d] new sparse read op at idx %d 0x%llx~0x%llx\n", __func__, + o->o_osd, o->o_sparse_op_idx, sr->sr_req_off, sr->sr_req_len); + + /* hand off request's sparse extent map buffer */ + sr->sr_ext_len = op->extent.sparse_ext_cnt; + op->extent.sparse_ext_cnt = 0; + sr->sr_extent = op->extent.sparse_ext; + op->extent.sparse_ext = NULL; + + spin_unlock(&o->o_requests_lock); + return 1; +} + +#ifdef __BIG_ENDIAN +static inline void convert_extent_map(struct ceph_sparse_read *sr) +{ + int i; + + for (i = 0; i < sr->sr_count; i++) { + struct ceph_sparse_extent *ext = &sr->sr_extent[i]; + + ext->off = le64_to_cpu((__force __le64)ext->off); + ext->len = le64_to_cpu((__force __le64)ext->len); + } +} +#else +static inline void convert_extent_map(struct ceph_sparse_read *sr) +{ +} +#endif + +#define MAX_EXTENTS 4096 + +static int osd_sparse_read(struct ceph_connection *con, + struct ceph_msg_data_cursor *cursor, + char **pbuf) +{ + struct ceph_osd *o = con->private; + struct ceph_sparse_read *sr = &o->o_sparse_read; + u32 count = sr->sr_count; + u64 eoff, elen; + int ret; + + switch (sr->sr_state) { + case CEPH_SPARSE_READ_HDR: +next_op: + ret = prep_next_sparse_read(con, cursor); + if (ret <= 0) + return ret; + + /* number of extents */ + ret = sizeof(sr->sr_count); + *pbuf = (char *)&sr->sr_count; + sr->sr_state = CEPH_SPARSE_READ_EXTENTS; + break; + case CEPH_SPARSE_READ_EXTENTS: + /* Convert sr_count to host-endian */ + count = le32_to_cpu((__force __le32)sr->sr_count); + sr->sr_count = count; + dout("[%d] got %u extents\n", o->o_osd, count); + + if (count > 0) { + if (!sr->sr_extent || count > sr->sr_ext_len) { + /* + * Apply a hard cap to the number of extents. + * If we have more, assume something is wrong. + */ + if (count > MAX_EXTENTS) { + dout("%s: OSD returned 0x%x extents in a single reply!\n", + __func__, count); + return -EREMOTEIO; + } + + /* no extent array provided, or too short */ + kfree(sr->sr_extent); + sr->sr_extent = kmalloc_array(count, + sizeof(*sr->sr_extent), + GFP_NOIO); + if (!sr->sr_extent) + return -ENOMEM; + sr->sr_ext_len = count; + } + ret = count * sizeof(*sr->sr_extent); + *pbuf = (char *)sr->sr_extent; + sr->sr_state = CEPH_SPARSE_READ_DATA_LEN; + break; + } + /* No extents? Read data len */ + fallthrough; + case CEPH_SPARSE_READ_DATA_LEN: + convert_extent_map(sr); + ret = sizeof(sr->sr_datalen); + *pbuf = (char *)&sr->sr_datalen; + sr->sr_state = CEPH_SPARSE_READ_DATA; + break; + case CEPH_SPARSE_READ_DATA: + if (sr->sr_index >= count) { + sr->sr_state = CEPH_SPARSE_READ_HDR; + goto next_op; + } + + eoff = sr->sr_extent[sr->sr_index].off; + elen = sr->sr_extent[sr->sr_index].len; + + dout("[%d] ext %d off 0x%llx len 0x%llx\n", + o->o_osd, sr->sr_index, eoff, elen); + + if (elen > INT_MAX) { + dout("Sparse read extent length too long (0x%llx)\n", + elen); + return -EREMOTEIO; + } + + /* zero out anything from sr_pos to start of extent */ + if (sr->sr_pos < eoff) + advance_cursor(cursor, eoff - sr->sr_pos, true); + + /* Set position to end of extent */ + sr->sr_pos = eoff + elen; + + /* send back the new length and nullify the ptr */ + cursor->sr_resid = elen; + ret = elen; + *pbuf = NULL; + + /* Bump the array index */ + ++sr->sr_index; + break; + } + return ret; +} + static const struct ceph_connection_operations osd_con_ops = { .get = osd_get_con, .put = osd_put_con, + .sparse_read = osd_sparse_read, .alloc_msg = osd_alloc_msg, .dispatch = osd_dispatch, .fault = osd_fault, -- cgit v1.2.3 From dee0c5f834605ce9b384ee8b9c7032ffd8db4eca Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Fri, 1 Jul 2022 06:30:12 -0400 Subject: libceph: add new iov_iter-based ceph_msg_data_type and ceph_osd_data_type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add an iov_iter to the unions in ceph_msg_data and ceph_msg_data_cursor. Instead of requiring a list of pages or bvecs, we can just use an iov_iter directly, and avoid extra allocations. We assume that the pages represented by the iter are pinned such that they shouldn't incur page faults, which is the case for the iov_iters created by netfs. While working on this, Al Viro informed me that he was going to change iov_iter_get_pages to auto-advance the iterator as that pattern is more or less required for ITER_PIPE anyway. We emulate that here for now by advancing in the _next op and tracking that amount in the "lastlen" field. In the event that _next is called twice without an intervening _advance, we revert the iov_iter by the remaining lastlen before calling iov_iter_get_pages. Cc: Al Viro Cc: David Howells Signed-off-by: Jeff Layton Reviewed-by: Xiubo Li Reviewed-and-tested-by: Luís Henriques Reviewed-by: Milind Changire Signed-off-by: Ilya Dryomov --- include/linux/ceph/messenger.h | 8 +++++ include/linux/ceph/osd_client.h | 4 +++ net/ceph/messenger.c | 77 +++++++++++++++++++++++++++++++++++++++++ net/ceph/osd_client.c | 27 +++++++++++++++ 4 files changed, 116 insertions(+) (limited to 'include') diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 9fd7255172ad..2eaaabbe98cb 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -123,6 +123,7 @@ enum ceph_msg_data_type { CEPH_MSG_DATA_BIO, /* data source/destination is a bio list */ #endif /* CONFIG_BLOCK */ CEPH_MSG_DATA_BVECS, /* data source/destination is a bio_vec array */ + CEPH_MSG_DATA_ITER, /* data source/destination is an iov_iter */ }; #ifdef CONFIG_BLOCK @@ -224,6 +225,7 @@ struct ceph_msg_data { bool own_pages; }; struct ceph_pagelist *pagelist; + struct iov_iter iter; }; }; @@ -248,6 +250,10 @@ struct ceph_msg_data_cursor { struct page *page; /* page from list */ size_t offset; /* bytes from list */ }; + struct { + struct iov_iter iov_iter; + unsigned int lastlen; + }; }; }; @@ -605,6 +611,8 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos, #endif /* CONFIG_BLOCK */ void ceph_msg_data_add_bvecs(struct ceph_msg *msg, struct ceph_bvec_iter *bvec_pos); +void ceph_msg_data_add_iter(struct ceph_msg *msg, + struct iov_iter *iter); struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items, gfp_t flags, bool can_fail); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index bfa4813590da..8f5d2b5bbba2 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -108,6 +108,7 @@ enum ceph_osd_data_type { CEPH_OSD_DATA_TYPE_BIO, #endif /* CONFIG_BLOCK */ CEPH_OSD_DATA_TYPE_BVECS, + CEPH_OSD_DATA_TYPE_ITER, }; struct ceph_osd_data { @@ -131,6 +132,7 @@ struct ceph_osd_data { struct ceph_bvec_iter bvec_pos; u32 num_bvecs; }; + struct iov_iter iter; }; }; @@ -501,6 +503,8 @@ void osd_req_op_extent_osd_data_bvecs(struct ceph_osd_request *osd_req, void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req, unsigned int which, struct ceph_bvec_iter *bvec_pos); +void osd_req_op_extent_osd_iter(struct ceph_osd_request *osd_req, + unsigned int which, struct iov_iter *iter); extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *, unsigned int which, diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 2eb10d7518e8..10a41cd9c523 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -969,6 +969,62 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor, return true; } +static void ceph_msg_data_iter_cursor_init(struct ceph_msg_data_cursor *cursor, + size_t length) +{ + struct ceph_msg_data *data = cursor->data; + + cursor->iov_iter = data->iter; + cursor->lastlen = 0; + iov_iter_truncate(&cursor->iov_iter, length); + cursor->resid = iov_iter_count(&cursor->iov_iter); +} + +static struct page *ceph_msg_data_iter_next(struct ceph_msg_data_cursor *cursor, + size_t *page_offset, size_t *length) +{ + struct page *page; + ssize_t len; + + if (cursor->lastlen) + iov_iter_revert(&cursor->iov_iter, cursor->lastlen); + + len = iov_iter_get_pages2(&cursor->iov_iter, &page, PAGE_SIZE, + 1, page_offset); + BUG_ON(len < 0); + + cursor->lastlen = len; + + /* + * FIXME: The assumption is that the pages represented by the iov_iter + * are pinned, with the references held by the upper-level + * callers, or by virtue of being under writeback. Eventually, + * we'll get an iov_iter_get_pages2 variant that doesn't take + * page refs. Until then, just put the page ref. + */ + VM_BUG_ON_PAGE(!PageWriteback(page) && page_count(page) < 2, page); + put_page(page); + + *length = min_t(size_t, len, cursor->resid); + return page; +} + +static bool ceph_msg_data_iter_advance(struct ceph_msg_data_cursor *cursor, + size_t bytes) +{ + BUG_ON(bytes > cursor->resid); + cursor->resid -= bytes; + + if (bytes < cursor->lastlen) { + cursor->lastlen -= bytes; + } else { + iov_iter_advance(&cursor->iov_iter, bytes - cursor->lastlen); + cursor->lastlen = 0; + } + + return cursor->resid; +} + /* * Message data is handled (sent or received) in pieces, where each * piece resides on a single page. The network layer might not @@ -996,6 +1052,9 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor) case CEPH_MSG_DATA_BVECS: ceph_msg_data_bvecs_cursor_init(cursor, length); break; + case CEPH_MSG_DATA_ITER: + ceph_msg_data_iter_cursor_init(cursor, length); + break; case CEPH_MSG_DATA_NONE: default: /* BUG(); */ @@ -1043,6 +1102,9 @@ struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, case CEPH_MSG_DATA_BVECS: page = ceph_msg_data_bvecs_next(cursor, page_offset, length); break; + case CEPH_MSG_DATA_ITER: + page = ceph_msg_data_iter_next(cursor, page_offset, length); + break; case CEPH_MSG_DATA_NONE: default: page = NULL; @@ -1081,6 +1143,9 @@ void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes) case CEPH_MSG_DATA_BVECS: new_piece = ceph_msg_data_bvecs_advance(cursor, bytes); break; + case CEPH_MSG_DATA_ITER: + new_piece = ceph_msg_data_iter_advance(cursor, bytes); + break; case CEPH_MSG_DATA_NONE: default: BUG(); @@ -1880,6 +1945,18 @@ void ceph_msg_data_add_bvecs(struct ceph_msg *msg, } EXPORT_SYMBOL(ceph_msg_data_add_bvecs); +void ceph_msg_data_add_iter(struct ceph_msg *msg, + struct iov_iter *iter) +{ + struct ceph_msg_data *data; + + data = ceph_msg_data_add(msg); + data->type = CEPH_MSG_DATA_ITER; + data->iter = *iter; + + msg->data_length += iov_iter_count(&data->iter); +} + /* * construct a new message with given type, size * the new msg has a ref count of 1. diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 0aacbadcab06..684faf8553de 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -171,6 +171,13 @@ static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data, osd_data->num_bvecs = num_bvecs; } +static void ceph_osd_iter_init(struct ceph_osd_data *osd_data, + struct iov_iter *iter) +{ + osd_data->type = CEPH_OSD_DATA_TYPE_ITER; + osd_data->iter = *iter; +} + static struct ceph_osd_data * osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) { @@ -264,6 +271,22 @@ void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req, } EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos); +/** + * osd_req_op_extent_osd_iter - Set up an operation with an iterator buffer + * @osd_req: The request to set up + * @which: Index of the operation in which to set the iter + * @iter: The buffer iterator + */ +void osd_req_op_extent_osd_iter(struct ceph_osd_request *osd_req, + unsigned int which, struct iov_iter *iter) +{ + struct ceph_osd_data *osd_data; + + osd_data = osd_req_op_data(osd_req, which, extent, osd_data); + ceph_osd_iter_init(osd_data, iter); +} +EXPORT_SYMBOL(osd_req_op_extent_osd_iter); + static void osd_req_op_cls_request_info_pagelist( struct ceph_osd_request *osd_req, unsigned int which, struct ceph_pagelist *pagelist) @@ -346,6 +369,8 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) #endif /* CONFIG_BLOCK */ case CEPH_OSD_DATA_TYPE_BVECS: return osd_data->bvec_pos.iter.bi_size; + case CEPH_OSD_DATA_TYPE_ITER: + return iov_iter_count(&osd_data->iter); default: WARN(true, "unrecognized data type %d\n", (int)osd_data->type); return 0; @@ -954,6 +979,8 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg, #endif } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) { ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos); + } else if (osd_data->type == CEPH_OSD_DATA_TYPE_ITER) { + ceph_msg_data_add_iter(msg, &osd_data->iter); } else { BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); } -- cgit v1.2.3 From 2d332d5bc424404911540006a8bb450fbb96b178 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Mon, 27 Jul 2020 10:16:09 -0400 Subject: ceph: fscrypt_auth handling for ceph MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Most fscrypt-enabled filesystems store the crypto context in an xattr, but that's problematic for ceph as xatts are governed by the XATTR cap, but we really want the crypto context as part of the AUTH cap. Because of this, the MDS has added two new inode metadata fields: fscrypt_auth and fscrypt_file. The former is used to hold the crypto context, and the latter is used to track the real file size. Parse new fscrypt_auth and fscrypt_file fields in inode traces. For now, we don't use fscrypt_file, but fscrypt_auth is used to hold the fscrypt context. Allow the client to use a setattr request for setting the fscrypt_auth field. Since this is not a standard setattr request from the VFS, we add a new field to __ceph_setattr that carries ceph-specific inode attrs. Have the set_context op do a setattr that sets the fscrypt_auth value, and get_context just return the contents of that field (since it should always be available). Signed-off-by: Jeff Layton Reviewed-by: Xiubo Li Reviewed-and-tested-by: Luís Henriques Reviewed-by: Milind Changire Signed-off-by: Ilya Dryomov --- fs/ceph/Makefile | 1 + fs/ceph/acl.c | 4 +- fs/ceph/caps.c | 78 ++++++++++++++++++++++++----- fs/ceph/crypto.c | 77 +++++++++++++++++++++++++++++ fs/ceph/crypto.h | 36 ++++++++++++++ fs/ceph/inode.c | 64 +++++++++++++++++++++++- fs/ceph/mds_client.c | 114 +++++++++++++++++++++++++++++++++++++++---- fs/ceph/mds_client.h | 7 +++ fs/ceph/super.c | 3 ++ fs/ceph/super.h | 15 +++++- include/linux/ceph/ceph_fs.h | 21 +++++--- 11 files changed, 385 insertions(+), 35 deletions(-) create mode 100644 fs/ceph/crypto.c create mode 100644 fs/ceph/crypto.h (limited to 'include') diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile index 50c635dc7f71..1f77ca04c426 100644 --- a/fs/ceph/Makefile +++ b/fs/ceph/Makefile @@ -12,3 +12,4 @@ ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \ ceph-$(CONFIG_CEPH_FSCACHE) += cache.o ceph-$(CONFIG_CEPH_FS_POSIX_ACL) += acl.o +ceph-$(CONFIG_FS_ENCRYPTION) += crypto.o diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c index 6945a938d396..8a56f979c7cb 100644 --- a/fs/ceph/acl.c +++ b/fs/ceph/acl.c @@ -140,7 +140,7 @@ int ceph_set_acl(struct mnt_idmap *idmap, struct dentry *dentry, newattrs.ia_ctime = current_time(inode); newattrs.ia_mode = new_mode; newattrs.ia_valid = ATTR_MODE | ATTR_CTIME; - ret = __ceph_setattr(inode, &newattrs); + ret = __ceph_setattr(inode, &newattrs, NULL); if (ret) goto out_free; } @@ -151,7 +151,7 @@ int ceph_set_acl(struct mnt_idmap *idmap, struct dentry *dentry, newattrs.ia_ctime = old_ctime; newattrs.ia_mode = old_mode; newattrs.ia_valid = ATTR_MODE | ATTR_CTIME; - __ceph_setattr(inode, &newattrs); + __ceph_setattr(inode, &newattrs, NULL); } goto out_free; } diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index e2bb0d0072da..1c62ef339bc6 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -14,6 +14,7 @@ #include "super.h" #include "mds_client.h" #include "cache.h" +#include "crypto.h" #include #include @@ -1216,15 +1217,12 @@ struct cap_msg_args { umode_t mode; bool inline_data; bool wake; + u32 fscrypt_auth_len; + u32 fscrypt_file_len; + u8 fscrypt_auth[sizeof(struct ceph_fscrypt_auth)]; // for context + u8 fscrypt_file[sizeof(u64)]; // for size }; -/* - * cap struct size + flock buffer size + inline version + inline data size + - * osd_epoch_barrier + oldest_flush_tid - */ -#define CAP_MSG_SIZE (sizeof(struct ceph_mds_caps) + \ - 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4) - /* Marshal up the cap msg to the MDS */ static void encode_cap_msg(struct ceph_msg *msg, struct cap_msg_args *arg) { @@ -1240,7 +1238,7 @@ static void encode_cap_msg(struct ceph_msg *msg, struct cap_msg_args *arg) arg->size, arg->max_size, arg->xattr_version, arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0); - msg->hdr.version = cpu_to_le16(10); + msg->hdr.version = cpu_to_le16(12); msg->hdr.tid = cpu_to_le64(arg->flush_tid); fc = msg->front.iov_base; @@ -1311,6 +1309,21 @@ static void encode_cap_msg(struct ceph_msg *msg, struct cap_msg_args *arg) /* Advisory flags (version 10) */ ceph_encode_32(&p, arg->flags); + + /* dirstats (version 11) - these are r/o on the client */ + ceph_encode_64(&p, 0); + ceph_encode_64(&p, 0); + +#if IS_ENABLED(CONFIG_FS_ENCRYPTION) + /* fscrypt_auth and fscrypt_file (version 12) */ + ceph_encode_32(&p, arg->fscrypt_auth_len); + ceph_encode_copy(&p, arg->fscrypt_auth, arg->fscrypt_auth_len); + ceph_encode_32(&p, arg->fscrypt_file_len); + ceph_encode_copy(&p, arg->fscrypt_file, arg->fscrypt_file_len); +#else /* CONFIG_FS_ENCRYPTION */ + ceph_encode_32(&p, 0); + ceph_encode_32(&p, 0); +#endif /* CONFIG_FS_ENCRYPTION */ } /* @@ -1432,7 +1445,37 @@ static void __prep_cap(struct cap_msg_args *arg, struct ceph_cap *cap, } } arg->flags = flags; +#if IS_ENABLED(CONFIG_FS_ENCRYPTION) + if (ci->fscrypt_auth_len && + WARN_ON_ONCE(ci->fscrypt_auth_len > sizeof(struct ceph_fscrypt_auth))) { + /* Don't set this if it's too big */ + arg->fscrypt_auth_len = 0; + } else { + arg->fscrypt_auth_len = ci->fscrypt_auth_len; + memcpy(arg->fscrypt_auth, ci->fscrypt_auth, + min_t(size_t, ci->fscrypt_auth_len, + sizeof(arg->fscrypt_auth))); + } + /* FIXME: use this to track "real" size */ + arg->fscrypt_file_len = 0; +#endif /* CONFIG_FS_ENCRYPTION */ +} + +#define CAP_MSG_FIXED_FIELDS (sizeof(struct ceph_mds_caps) + \ + 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + 8 + 4 + 4) + +#if IS_ENABLED(CONFIG_FS_ENCRYPTION) +static inline int cap_msg_size(struct cap_msg_args *arg) +{ + return CAP_MSG_FIXED_FIELDS + arg->fscrypt_auth_len + + arg->fscrypt_file_len; } +#else +static inline int cap_msg_size(struct cap_msg_args *arg) +{ + return CAP_MSG_FIXED_FIELDS; +} +#endif /* CONFIG_FS_ENCRYPTION */ /* * Send a cap msg on the given inode. @@ -1444,7 +1487,8 @@ static void __send_cap(struct cap_msg_args *arg, struct ceph_inode_info *ci) struct ceph_msg *msg; struct inode *inode = &ci->netfs.inode; - msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, CAP_MSG_SIZE, GFP_NOFS, false); + msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, cap_msg_size(arg), GFP_NOFS, + false); if (!msg) { pr_err("error allocating cap msg: ino (%llx.%llx) flushing %s tid %llu, requeuing cap.\n", ceph_vinop(inode), ceph_cap_string(arg->dirty), @@ -1470,10 +1514,6 @@ static inline int __send_flush_snap(struct inode *inode, struct cap_msg_args arg; struct ceph_msg *msg; - msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, CAP_MSG_SIZE, GFP_NOFS, false); - if (!msg) - return -ENOMEM; - arg.session = session; arg.ino = ceph_vino(inode).ino; arg.cid = 0; @@ -1511,6 +1551,18 @@ static inline int __send_flush_snap(struct inode *inode, arg.flags = 0; arg.wake = false; + /* + * No fscrypt_auth changes from a capsnap. It will need + * to update fscrypt_file on size changes (TODO). + */ + arg.fscrypt_auth_len = 0; + arg.fscrypt_file_len = 0; + + msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, cap_msg_size(&arg), + GFP_NOFS, false); + if (!msg) + return -ENOMEM; + encode_cap_msg(msg, &arg); ceph_con_send(&arg.session->s_con, msg); return 0; diff --git a/fs/ceph/crypto.c b/fs/ceph/crypto.c new file mode 100644 index 000000000000..b17a6ee16cf0 --- /dev/null +++ b/fs/ceph/crypto.c @@ -0,0 +1,77 @@ +// SPDX-License-Identifier: GPL-2.0 +#include +#include +#include + +#include "super.h" +#include "crypto.h" + +static int ceph_crypt_get_context(struct inode *inode, void *ctx, size_t len) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fscrypt_auth *cfa = (struct ceph_fscrypt_auth *)ci->fscrypt_auth; + u32 ctxlen; + + /* Non existent or too short? */ + if (!cfa || (ci->fscrypt_auth_len < (offsetof(struct ceph_fscrypt_auth, cfa_blob) + 1))) + return -ENOBUFS; + + /* Some format we don't recognize? */ + if (le32_to_cpu(cfa->cfa_version) != CEPH_FSCRYPT_AUTH_VERSION) + return -ENOBUFS; + + ctxlen = le32_to_cpu(cfa->cfa_blob_len); + if (len < ctxlen) + return -ERANGE; + + memcpy(ctx, cfa->cfa_blob, ctxlen); + return ctxlen; +} + +static int ceph_crypt_set_context(struct inode *inode, const void *ctx, + size_t len, void *fs_data) +{ + int ret; + struct iattr attr = { }; + struct ceph_iattr cia = { }; + struct ceph_fscrypt_auth *cfa; + + WARN_ON_ONCE(fs_data); + + if (len > FSCRYPT_SET_CONTEXT_MAX_SIZE) + return -EINVAL; + + cfa = kzalloc(sizeof(*cfa), GFP_KERNEL); + if (!cfa) + return -ENOMEM; + + cfa->cfa_version = cpu_to_le32(CEPH_FSCRYPT_AUTH_VERSION); + cfa->cfa_blob_len = cpu_to_le32(len); + memcpy(cfa->cfa_blob, ctx, len); + + cia.fscrypt_auth = cfa; + + ret = __ceph_setattr(inode, &attr, &cia); + if (ret == 0) + inode_set_flags(inode, S_ENCRYPTED, S_ENCRYPTED); + kfree(cia.fscrypt_auth); + return ret; +} + +static bool ceph_crypt_empty_dir(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + + return ci->i_rsubdirs + ci->i_rfiles == 1; +} + +static struct fscrypt_operations ceph_fscrypt_ops = { + .get_context = ceph_crypt_get_context, + .set_context = ceph_crypt_set_context, + .empty_dir = ceph_crypt_empty_dir, +}; + +void ceph_fscrypt_set_ops(struct super_block *sb) +{ + fscrypt_set_ops(sb, &ceph_fscrypt_ops); +} diff --git a/fs/ceph/crypto.h b/fs/ceph/crypto.h new file mode 100644 index 000000000000..6dca674f79b8 --- /dev/null +++ b/fs/ceph/crypto.h @@ -0,0 +1,36 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +/* + * Ceph fscrypt functionality + */ + +#ifndef _CEPH_CRYPTO_H +#define _CEPH_CRYPTO_H + +#include + +struct ceph_fscrypt_auth { + __le32 cfa_version; + __le32 cfa_blob_len; + u8 cfa_blob[FSCRYPT_SET_CONTEXT_MAX_SIZE]; +} __packed; + +#define CEPH_FSCRYPT_AUTH_VERSION 1 +static inline u32 ceph_fscrypt_auth_len(struct ceph_fscrypt_auth *fa) +{ + u32 ctxsize = le32_to_cpu(fa->cfa_blob_len); + + return offsetof(struct ceph_fscrypt_auth, cfa_blob) + ctxsize; +} + +#ifdef CONFIG_FS_ENCRYPTION +void ceph_fscrypt_set_ops(struct super_block *sb); + +#else /* CONFIG_FS_ENCRYPTION */ + +static inline void ceph_fscrypt_set_ops(struct super_block *sb) +{ +} + +#endif /* CONFIG_FS_ENCRYPTION */ + +#endif diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index e23172024707..a3aa7870a6a2 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -14,10 +14,12 @@ #include #include #include +#include #include "super.h" #include "mds_client.h" #include "cache.h" +#include "crypto.h" #include /* @@ -617,6 +619,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb) INIT_WORK(&ci->i_work, ceph_inode_work); ci->i_work_mask = 0; memset(&ci->i_btime, '\0', sizeof(ci->i_btime)); +#ifdef CONFIG_FS_ENCRYPTION + ci->fscrypt_auth = NULL; + ci->fscrypt_auth_len = 0; +#endif return &ci->netfs.inode; } @@ -625,6 +631,9 @@ void ceph_free_inode(struct inode *inode) struct ceph_inode_info *ci = ceph_inode(inode); kfree(ci->i_symlink); +#ifdef CONFIG_FS_ENCRYPTION + kfree(ci->fscrypt_auth); +#endif kmem_cache_free(ceph_inode_cachep, ci); } @@ -645,6 +654,7 @@ void ceph_evict_inode(struct inode *inode) clear_inode(inode); ceph_fscache_unregister_inode_cookie(ci); + fscrypt_put_encryption_info(inode); __ceph_remove_caps(ci); @@ -935,6 +945,17 @@ int ceph_fill_inode(struct inode *inode, struct page *locked_page, __ceph_update_quota(ci, iinfo->max_bytes, iinfo->max_files); +#ifdef CONFIG_FS_ENCRYPTION + if (iinfo->fscrypt_auth_len && (inode->i_state & I_NEW)) { + kfree(ci->fscrypt_auth); + ci->fscrypt_auth_len = iinfo->fscrypt_auth_len; + ci->fscrypt_auth = iinfo->fscrypt_auth; + iinfo->fscrypt_auth = NULL; + iinfo->fscrypt_auth_len = 0; + inode_set_flags(inode, S_ENCRYPTED, S_ENCRYPTED); + } +#endif + if ((new_version || (new_issued & CEPH_CAP_AUTH_SHARED)) && (issued & CEPH_CAP_AUTH_EXCL) == 0) { inode->i_mode = mode; @@ -2079,7 +2100,8 @@ static const struct inode_operations ceph_symlink_iops = { .listxattr = ceph_listxattr, }; -int __ceph_setattr(struct inode *inode, struct iattr *attr) +int __ceph_setattr(struct inode *inode, struct iattr *attr, + struct ceph_iattr *cia) { struct ceph_inode_info *ci = ceph_inode(inode); unsigned int ia_valid = attr->ia_valid; @@ -2119,6 +2141,43 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr) } dout("setattr %p issued %s\n", inode, ceph_cap_string(issued)); +#if IS_ENABLED(CONFIG_FS_ENCRYPTION) + if (cia && cia->fscrypt_auth) { + u32 len = ceph_fscrypt_auth_len(cia->fscrypt_auth); + + if (len > sizeof(*cia->fscrypt_auth)) { + err = -EINVAL; + spin_unlock(&ci->i_ceph_lock); + goto out; + } + + dout("setattr %llx:%llx fscrypt_auth len %u to %u)\n", + ceph_vinop(inode), ci->fscrypt_auth_len, len); + + /* It should never be re-set once set */ + WARN_ON_ONCE(ci->fscrypt_auth); + + if (issued & CEPH_CAP_AUTH_EXCL) { + dirtied |= CEPH_CAP_AUTH_EXCL; + kfree(ci->fscrypt_auth); + ci->fscrypt_auth = (u8 *)cia->fscrypt_auth; + ci->fscrypt_auth_len = len; + } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 || + ci->fscrypt_auth_len != len || + memcmp(ci->fscrypt_auth, cia->fscrypt_auth, len)) { + req->r_fscrypt_auth = cia->fscrypt_auth; + mask |= CEPH_SETATTR_FSCRYPT_AUTH; + release |= CEPH_CAP_AUTH_SHARED; + } + cia->fscrypt_auth = NULL; + } +#else + if (cia && cia->fscrypt_auth) { + err = -EINVAL; + spin_unlock(&ci->i_ceph_lock); + goto out; + } +#endif /* CONFIG_FS_ENCRYPTION */ if (ia_valid & ATTR_UID) { dout("setattr %p uid %d -> %d\n", inode, @@ -2282,6 +2341,7 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr) req->r_stamp = attr->ia_ctime; err = ceph_mdsc_do_request(mdsc, NULL, req); } +out: dout("setattr %p result=%d (%s locally, %d remote)\n", inode, err, ceph_cap_string(dirtied), mask); @@ -2322,7 +2382,7 @@ int ceph_setattr(struct mnt_idmap *idmap, struct dentry *dentry, ceph_quota_is_max_bytes_exceeded(inode, attr->ia_size)) return -EDQUOT; - err = __ceph_setattr(inode, attr); + err = __ceph_setattr(inode, attr, NULL); if (err >= 0 && (attr->ia_valid & ATTR_MODE)) err = posix_acl_chmod(&nop_mnt_idmap, dentry, attr->ia_mode); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 77b5e7b2e5dd..c3927dab5a3b 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -15,6 +15,7 @@ #include "super.h" #include "mds_client.h" +#include "crypto.h" #include #include @@ -184,8 +185,54 @@ static int parse_reply_info_in(void **p, void *end, info->rsnaps = 0; } + if (struct_v >= 5) { + u32 alen; + + ceph_decode_32_safe(p, end, alen, bad); + + while (alen--) { + u32 len; + + /* key */ + ceph_decode_32_safe(p, end, len, bad); + ceph_decode_skip_n(p, end, len, bad); + /* value */ + ceph_decode_32_safe(p, end, len, bad); + ceph_decode_skip_n(p, end, len, bad); + } + } + + /* fscrypt flag -- ignore */ + if (struct_v >= 6) + ceph_decode_skip_8(p, end, bad); + + info->fscrypt_auth = NULL; + info->fscrypt_auth_len = 0; + info->fscrypt_file = NULL; + info->fscrypt_file_len = 0; + if (struct_v >= 7) { + ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad); + if (info->fscrypt_auth_len) { + info->fscrypt_auth = kmalloc(info->fscrypt_auth_len, + GFP_KERNEL); + if (!info->fscrypt_auth) + return -ENOMEM; + ceph_decode_copy_safe(p, end, info->fscrypt_auth, + info->fscrypt_auth_len, bad); + } + ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad); + if (info->fscrypt_file_len) { + info->fscrypt_file = kmalloc(info->fscrypt_file_len, + GFP_KERNEL); + if (!info->fscrypt_file) + return -ENOMEM; + ceph_decode_copy_safe(p, end, info->fscrypt_file, + info->fscrypt_file_len, bad); + } + } *p = end; } else { + /* legacy (unversioned) struct */ if (features & CEPH_FEATURE_MDS_INLINE_DATA) { ceph_decode_64_safe(p, end, info->inline_version, bad); ceph_decode_32_safe(p, end, info->inline_len, bad); @@ -651,8 +698,21 @@ out_bad: static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) { + int i; + + kfree(info->diri.fscrypt_auth); + kfree(info->diri.fscrypt_file); + kfree(info->targeti.fscrypt_auth); + kfree(info->targeti.fscrypt_file); if (!info->dir_entries) return; + + for (i = 0; i < info->dir_nr; i++) { + struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; + + kfree(rde->inode.fscrypt_auth); + kfree(rde->inode.fscrypt_file); + } free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); } @@ -966,6 +1026,7 @@ void ceph_mdsc_release_request(struct kref *kref) put_cred(req->r_cred); if (req->r_pagelist) ceph_pagelist_release(req->r_pagelist); + kfree(req->r_fscrypt_auth); put_request_session(req); ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); WARN_ON_ONCE(!list_empty(&req->r_wait)); @@ -2543,8 +2604,8 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, return r; } -static void encode_timestamp_and_gids(void **p, - const struct ceph_mds_request *req) +static void encode_mclientrequest_tail(void **p, + const struct ceph_mds_request *req) { struct ceph_timespec ts; int i; @@ -2557,6 +2618,20 @@ static void encode_timestamp_and_gids(void **p, for (i = 0; i < req->r_cred->group_info->ngroups; i++) ceph_encode_64(p, from_kgid(&init_user_ns, req->r_cred->group_info->gid[i])); + + /* v5: altname (TODO: skip for now) */ + ceph_encode_32(p, 0); + + /* v6: fscrypt_auth and fscrypt_file */ + if (req->r_fscrypt_auth) { + u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth); + + ceph_encode_32(p, authlen); + ceph_encode_copy(p, req->r_fscrypt_auth, authlen); + } else { + ceph_encode_32(p, 0); + } + ceph_encode_32(p, 0); // fscrypt_file for now } /* @@ -2605,12 +2680,14 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, goto out_free1; } + /* head */ len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head); - len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + - sizeof(struct ceph_timespec); - len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); - /* calculate (max) length for cap releases */ + /* filepaths */ + len += 2 * (1 + sizeof(u32) + sizeof(u64)); + len += pathlen1 + pathlen2; + + /* cap releases */ len += sizeof(struct ceph_mds_request_release) * (!!req->r_inode_drop + !!req->r_dentry_drop + !!req->r_old_inode_drop + !!req->r_old_dentry_drop); @@ -2620,6 +2697,25 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, if (req->r_old_dentry_drop) len += pathlen2; + /* MClientRequest tail */ + + /* req->r_stamp */ + len += sizeof(struct ceph_timespec); + + /* gid list */ + len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); + + /* alternate name */ + len += sizeof(u32); // TODO + + /* fscrypt_auth */ + len += sizeof(u32); // fscrypt_auth + if (req->r_fscrypt_auth) + len += ceph_fscrypt_auth_len(req->r_fscrypt_auth); + + /* fscrypt_file */ + len += sizeof(u32); + msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); if (!msg) { msg = ERR_PTR(-ENOMEM); @@ -2639,7 +2735,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, } else { struct ceph_mds_request_head *new_head = msg->front.iov_base; - msg->hdr.version = cpu_to_le16(4); + msg->hdr.version = cpu_to_le16(6); new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; p = msg->front.iov_base + sizeof(*new_head); @@ -2690,7 +2786,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, head->num_releases = cpu_to_le16(releases); - encode_timestamp_and_gids(&p, req); + encode_mclientrequest_tail(&p, req); if (WARN_ON_ONCE(p > end)) { ceph_msg_put(msg); @@ -2820,7 +2916,7 @@ static int __prepare_send_request(struct ceph_mds_session *session, rhead->num_releases = 0; p = msg->front.iov_base + req->r_request_release_offset; - encode_timestamp_and_gids(&p, req); + encode_mclientrequest_tail(&p, req); msg->front.iov_len = p - msg->front.iov_base; msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 3c1aa99c1876..a2e85fb5aab1 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -86,6 +86,10 @@ struct ceph_mds_reply_info_in { s32 dir_pin; struct ceph_timespec btime; struct ceph_timespec snap_btime; + u8 *fscrypt_auth; + u8 *fscrypt_file; + u32 fscrypt_auth_len; + u32 fscrypt_file_len; u64 rsnaps; u64 change_attr; }; @@ -278,6 +282,9 @@ struct ceph_mds_request { struct mutex r_fill_mutex; union ceph_mds_request_args r_args; + + struct ceph_fscrypt_auth *r_fscrypt_auth; + int r_fmode; /* file mode, if expecting cap */ int r_request_release_offset; const struct cred *r_cred; diff --git a/fs/ceph/super.c b/fs/ceph/super.c index d6a1790f6923..c4ab2db85ef0 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -20,6 +20,7 @@ #include "super.h" #include "mds_client.h" #include "cache.h" +#include "crypto.h" #include #include @@ -1135,6 +1136,8 @@ static int ceph_set_super(struct super_block *s, struct fs_context *fc) s->s_time_max = U32_MAX; s->s_flags |= SB_NODIRATIME | SB_NOATIME; + ceph_fscrypt_set_ops(s); + ret = set_anon_super_fc(s, fc); if (ret != 0) fsc->sb = NULL; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index eb621bb276bd..3a39a9b3bc33 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -450,6 +450,13 @@ struct ceph_inode_info { struct work_struct i_work; unsigned long i_work_mask; + +#ifdef CONFIG_FS_ENCRYPTION + u32 fscrypt_auth_len; + u32 fscrypt_file_len; + u8 *fscrypt_auth; + u8 *fscrypt_file; +#endif }; struct ceph_netfs_request_data { @@ -1073,7 +1080,13 @@ static inline int ceph_do_getattr(struct inode *inode, int mask, bool force) } extern int ceph_permission(struct mnt_idmap *idmap, struct inode *inode, int mask); -extern int __ceph_setattr(struct inode *inode, struct iattr *attr); + +struct ceph_iattr { + struct ceph_fscrypt_auth *fscrypt_auth; +}; + +extern int __ceph_setattr(struct inode *inode, struct iattr *attr, + struct ceph_iattr *cia); extern int ceph_setattr(struct mnt_idmap *idmap, struct dentry *dentry, struct iattr *attr); extern int ceph_getattr(struct mnt_idmap *idmap, diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 49586ff26152..45f8ce61e103 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -359,14 +359,19 @@ enum { extern const char *ceph_mds_op_name(int op); - -#define CEPH_SETATTR_MODE 1 -#define CEPH_SETATTR_UID 2 -#define CEPH_SETATTR_GID 4 -#define CEPH_SETATTR_MTIME 8 -#define CEPH_SETATTR_ATIME 16 -#define CEPH_SETATTR_SIZE 32 -#define CEPH_SETATTR_CTIME 64 +#define CEPH_SETATTR_MODE (1 << 0) +#define CEPH_SETATTR_UID (1 << 1) +#define CEPH_SETATTR_GID (1 << 2) +#define CEPH_SETATTR_MTIME (1 << 3) +#define CEPH_SETATTR_ATIME (1 << 4) +#define CEPH_SETATTR_SIZE (1 << 5) +#define CEPH_SETATTR_CTIME (1 << 6) +#define CEPH_SETATTR_MTIME_NOW (1 << 7) +#define CEPH_SETATTR_ATIME_NOW (1 << 8) +#define CEPH_SETATTR_BTIME (1 << 9) +#define CEPH_SETATTR_KILL_SGUID (1 << 10) +#define CEPH_SETATTR_FSCRYPT_AUTH (1 << 11) +#define CEPH_SETATTR_FSCRYPT_FILE (1 << 12) /* * Ceph setxattr request flags. -- cgit v1.2.3 From 69dd3b3930f96b624228000921f417fb0919a6ab Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Thu, 25 Aug 2022 09:31:05 -0400 Subject: libceph: add CEPH_OSD_OP_ASSERT_VER support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ...and record the user_version in the reply in a new field in ceph_osd_request, so we can populate the assert_ver appropriately. Shuffle the fields a bit too so that the new field fits in an existing hole on x86_64. Signed-off-by: Jeff Layton Reviewed-by: Xiubo Li Reviewed-and-tested-by: Luís Henriques Reviewed-by: Milind Changire Signed-off-by: Ilya Dryomov --- include/linux/ceph/osd_client.h | 6 +++++- include/linux/ceph/rados.h | 4 ++++ net/ceph/osd_client.c | 5 +++++ 3 files changed, 14 insertions(+), 1 deletion(-) (limited to 'include') diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 8f5d2b5bbba2..bf9823956758 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -198,6 +198,9 @@ struct ceph_osd_req_op { u32 src_fadvise_flags; struct ceph_osd_data osd_data; } copy_from; + struct { + u64 ver; + } assert_ver; }; }; @@ -252,6 +255,7 @@ struct ceph_osd_request { struct ceph_osd_client *r_osdc; struct kref r_kref; bool r_mempool; + bool r_linger; /* don't resend on failure */ struct completion r_completion; /* private to osd_client.c */ ceph_osdc_callback_t r_callback; @@ -264,9 +268,9 @@ struct ceph_osd_request { struct ceph_snap_context *r_snapc; /* for writes */ struct timespec64 r_mtime; /* ditto */ u64 r_data_offset; /* ditto */ - bool r_linger; /* don't resend on failure */ /* internal */ + u64 r_version; /* data version sent in reply */ unsigned long r_stamp; /* jiffies, send or check time */ unsigned long r_start_stamp; /* jiffies */ ktime_t r_start_latency; /* ktime_t */ diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h index 43a7a1573b51..73c3efbec36c 100644 --- a/include/linux/ceph/rados.h +++ b/include/linux/ceph/rados.h @@ -523,6 +523,10 @@ struct ceph_osd_op { struct { __le64 cookie; } __attribute__ ((packed)) notify; + struct { + __le64 unused; + __le64 ver; + } __attribute__ ((packed)) assert_ver; struct { __le64 offset, length; __le64 src_offset; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 684faf8553de..7f159e40cf9c 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1048,6 +1048,10 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst, dst->copy_from.src_fadvise_flags = cpu_to_le32(src->copy_from.src_fadvise_flags); break; + case CEPH_OSD_OP_ASSERT_VER: + dst->assert_ver.unused = cpu_to_le64(0); + dst->assert_ver.ver = cpu_to_le64(src->assert_ver.ver); + break; default: pr_err("unsupported osd opcode %s\n", ceph_osd_op_name(src->op)); @@ -3859,6 +3863,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) * one (type of) reply back. */ WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK)); + req->r_version = m.user_version; req->r_result = m.result ?: data_len; finish_request(req); mutex_unlock(&osd->lock); -- cgit v1.2.3 From 3af5ae22030cb59fab4fba35f5a2b62f47e14df9 Mon Sep 17 00:00:00 2001 From: Xiubo Li Date: Tue, 25 Jul 2023 09:44:40 +0800 Subject: ceph: make members in struct ceph_mds_request_args_ext a union In ceph mainline it will allow to set the btime in the setattr request and just add a 'btime' member in the union 'ceph_mds_request_args' and then bump up the header version to 4. That means the total size of union 'ceph_mds_request_args' will increase sizeof(struct ceph_timespec) bytes, but in kclient it will increase the sizeof(setattr_ext) bytes for each request. Since the MDS will always depend on the header's vesion and front_len members to decode the 'ceph_mds_request_head' struct, at the same time kclient hasn't supported the 'btime' feature yet in setattr request, so it's safe to do this change here. This will save 48 bytes memories for each request. Fixes: 4f1ddb1ea874 ("ceph: implement updated ceph_mds_request_head structure") Signed-off-by: Xiubo Li Reviewed-by: Milind Changire Signed-off-by: Ilya Dryomov --- include/linux/ceph/ceph_fs.h | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) (limited to 'include') diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 45f8ce61e103..ae44812eb6d4 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -467,17 +467,19 @@ union ceph_mds_request_args { } __attribute__ ((packed)); union ceph_mds_request_args_ext { - union ceph_mds_request_args old; - struct { - __le32 mode; - __le32 uid; - __le32 gid; - struct ceph_timespec mtime; - struct ceph_timespec atime; - __le64 size, old_size; /* old_size needed by truncate */ - __le32 mask; /* CEPH_SETATTR_* */ - struct ceph_timespec btime; - } __attribute__ ((packed)) setattr_ext; + union { + union ceph_mds_request_args old; + struct { + __le32 mode; + __le32 uid; + __le32 gid; + struct ceph_timespec mtime; + struct ceph_timespec atime; + __le64 size, old_size; /* old_size needed by truncate */ + __le32 mask; /* CEPH_SETATTR_* */ + struct ceph_timespec btime; + } __attribute__ ((packed)) setattr_ext; + }; }; #define CEPH_MDS_FLAG_REPLAY 1 /* this is a replayed op */ -- cgit v1.2.3 From ce0d5bd3a6c176f9a3bf867624a07119dd4d0878 Mon Sep 17 00:00:00 2001 From: Xiubo Li Date: Tue, 25 Jul 2023 17:51:59 +0800 Subject: ceph: make num_fwd and num_retry to __u32 The num_fwd in MClientRequestForward is int32_t, while the num_fwd in ceph_mds_request_head is __u8. This is buggy when the num_fwd is larger than 256 it will always be truncate to 0 again. But the client couldn't recoginize this. This will make them to __u32 instead. Because the old cephs will directly copy the raw memories when decoding the reqeust's head, so we need to make sure this kclient will be compatible with old cephs. For newer cephs they will decode the requests depending the version, which will be much simpler and easier to extend new members. Link: https://tracker.ceph.com/issues/62145 Signed-off-by: Xiubo Li Reviewed-by: Alexander Mikhalitsyn Reviewed-by: Milind Changire Signed-off-by: Ilya Dryomov --- fs/ceph/mds_client.c | 189 +++++++++++++++++++++++-------------------- fs/ceph/mds_client.h | 4 +- include/linux/ceph/ceph_fs.h | 23 +++++- 3 files changed, 126 insertions(+), 90 deletions(-) (limited to 'include') diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 04a881343e43..615db141b6c4 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2840,6 +2840,18 @@ static void encode_mclientrequest_tail(void **p, } } +static struct ceph_mds_request_head_legacy * +find_legacy_request_head(void *p, u64 features) +{ + bool legacy = !(features & CEPH_FEATURE_FS_BTIME); + struct ceph_mds_request_head_old *ohead; + + if (legacy) + return (struct ceph_mds_request_head_legacy *)p; + ohead = (struct ceph_mds_request_head_old *)p; + return (struct ceph_mds_request_head_legacy *)&ohead->oldest_client_tid; +} + /* * called under mdsc->mutex */ @@ -2850,7 +2862,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, int mds = session->s_mds; struct ceph_mds_client *mdsc = session->s_mdsc; struct ceph_msg *msg; - struct ceph_mds_request_head_old *head; + struct ceph_mds_request_head_legacy *lhead; const char *path1 = NULL; const char *path2 = NULL; u64 ino1 = 0, ino2 = 0; @@ -2862,6 +2874,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, void *p, *end; int ret; bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); + bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, + &session->s_features); ret = set_request_path_attr(req->r_inode, req->r_dentry, req->r_parent, req->r_path1, req->r_ino1.ino, @@ -2893,7 +2907,19 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, goto out_free2; } - len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head); + /* + * For old cephs without supporting the 32bit retry/fwd feature + * it will copy the raw memories directly when decoding the + * requests. While new cephs will decode the head depending the + * version member, so we need to make sure it will be compatible + * with them both. + */ + if (legacy) + len = sizeof(struct ceph_mds_request_head_legacy); + else if (old_version) + len = sizeof(struct ceph_mds_request_head_old); + else + len = sizeof(struct ceph_mds_request_head); /* filepaths */ len += 2 * (1 + sizeof(u32) + sizeof(u64)); @@ -2938,33 +2964,40 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, msg->hdr.tid = cpu_to_le64(req->r_tid); + lhead = find_legacy_request_head(msg->front.iov_base, + session->s_con.peer_features); + /* - * The old ceph_mds_request_head didn't contain a version field, and + * The ceph_mds_request_head_legacy didn't contain a version field, and * one was added when we moved the message version from 3->4. */ if (legacy) { msg->hdr.version = cpu_to_le16(3); - head = msg->front.iov_base; - p = msg->front.iov_base + sizeof(*head); + p = msg->front.iov_base + sizeof(*lhead); + } else if (old_version) { + struct ceph_mds_request_head_old *ohead = msg->front.iov_base; + + msg->hdr.version = cpu_to_le16(4); + ohead->version = cpu_to_le16(1); + p = msg->front.iov_base + sizeof(*ohead); } else { - struct ceph_mds_request_head *new_head = msg->front.iov_base; + struct ceph_mds_request_head *nhead = msg->front.iov_base; msg->hdr.version = cpu_to_le16(6); - new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); - head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; - p = msg->front.iov_base + sizeof(*new_head); + nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); + p = msg->front.iov_base + sizeof(*nhead); } end = msg->front.iov_base + msg->front.iov_len; - head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); - head->op = cpu_to_le32(req->r_op); - head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, - req->r_cred->fsuid)); - head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, - req->r_cred->fsgid)); - head->ino = cpu_to_le64(req->r_deleg_ino); - head->args = req->r_args; + lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); + lhead->op = cpu_to_le32(req->r_op); + lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, + req->r_cred->fsuid)); + lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, + req->r_cred->fsgid)); + lhead->ino = cpu_to_le64(req->r_deleg_ino); + lhead->args = req->r_args; ceph_encode_filepath(&p, end, ino1, path1); ceph_encode_filepath(&p, end, ino2, path2); @@ -3006,7 +3039,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session, p = msg->front.iov_base + req->r_request_release_offset; } - head->num_releases = cpu_to_le16(releases); + lhead->num_releases = cpu_to_le16(releases); encode_mclientrequest_tail(&p, req); @@ -3057,18 +3090,6 @@ static void complete_request(struct ceph_mds_client *mdsc, complete_all(&req->r_completion); } -static struct ceph_mds_request_head_old * -find_old_request_head(void *p, u64 features) -{ - bool legacy = !(features & CEPH_FEATURE_FS_BTIME); - struct ceph_mds_request_head *new_head; - - if (legacy) - return (struct ceph_mds_request_head_old *)p; - new_head = (struct ceph_mds_request_head *)p; - return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; -} - /* * called under mdsc->mutex */ @@ -3078,29 +3099,28 @@ static int __prepare_send_request(struct ceph_mds_session *session, { int mds = session->s_mds; struct ceph_mds_client *mdsc = session->s_mdsc; - struct ceph_mds_request_head_old *rhead; + struct ceph_mds_request_head_legacy *lhead; + struct ceph_mds_request_head *nhead; struct ceph_msg *msg; - int flags = 0, max_retry; + int flags = 0, old_max_retry; + bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, + &session->s_features); /* - * The type of 'r_attempts' in kernel 'ceph_mds_request' - * is 'int', while in 'ceph_mds_request_head' the type of - * 'num_retry' is '__u8'. So in case the request retries - * exceeding 256 times, the MDS will receive a incorrect - * retry seq. - * - * In this case it's ususally a bug in MDS and continue - * retrying the request makes no sense. - * - * In future this could be fixed in ceph code, so avoid - * using the hardcode here. + * Avoid inifinite retrying after overflow. The client will + * increase the retry count and if the MDS is old version, + * so we limit to retry at most 256 times. */ - max_retry = sizeof_field(struct ceph_mds_request_head, num_retry); - max_retry = 1 << (max_retry * BITS_PER_BYTE); - if (req->r_attempts >= max_retry) { - pr_warn_ratelimited("%s request tid %llu seq overflow\n", - __func__, req->r_tid); - return -EMULTIHOP; + if (req->r_attempts) { + old_max_retry = sizeof_field(struct ceph_mds_request_head_old, + num_retry); + old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE); + if ((old_version && req->r_attempts >= old_max_retry) || + ((uint32_t)req->r_attempts >= U32_MAX)) { + pr_warn_ratelimited("%s request tid %llu seq overflow\n", + __func__, req->r_tid); + return -EMULTIHOP; + } } req->r_attempts++; @@ -3126,20 +3146,24 @@ static int __prepare_send_request(struct ceph_mds_session *session, * d_move mangles the src name. */ msg = req->r_request; - rhead = find_old_request_head(msg->front.iov_base, - session->s_con.peer_features); + lhead = find_legacy_request_head(msg->front.iov_base, + session->s_con.peer_features); - flags = le32_to_cpu(rhead->flags); + flags = le32_to_cpu(lhead->flags); flags |= CEPH_MDS_FLAG_REPLAY; - rhead->flags = cpu_to_le32(flags); + lhead->flags = cpu_to_le32(flags); if (req->r_target_inode) - rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); + lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); - rhead->num_retry = req->r_attempts - 1; + lhead->num_retry = req->r_attempts - 1; + if (!old_version) { + nhead = (struct ceph_mds_request_head*)msg->front.iov_base; + nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); + } /* remove cap/dentry releases from message */ - rhead->num_releases = 0; + lhead->num_releases = 0; p = msg->front.iov_base + req->r_request_release_offset; encode_mclientrequest_tail(&p, req); @@ -3160,18 +3184,23 @@ static int __prepare_send_request(struct ceph_mds_session *session, } req->r_request = msg; - rhead = find_old_request_head(msg->front.iov_base, - session->s_con.peer_features); - rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); + lhead = find_legacy_request_head(msg->front.iov_base, + session->s_con.peer_features); + lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) flags |= CEPH_MDS_FLAG_REPLAY; if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) flags |= CEPH_MDS_FLAG_ASYNC; if (req->r_parent) flags |= CEPH_MDS_FLAG_WANT_DENTRY; - rhead->flags = cpu_to_le32(flags); - rhead->num_fwd = req->r_num_fwd; - rhead->num_retry = req->r_attempts - 1; + lhead->flags = cpu_to_le32(flags); + lhead->num_fwd = req->r_num_fwd; + lhead->num_retry = req->r_attempts - 1; + if (!old_version) { + nhead = (struct ceph_mds_request_head*)msg->front.iov_base; + nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd); + nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1); + } dout(" r_parent = %p\n", req->r_parent); return 0; @@ -3830,33 +3859,21 @@ static void handle_forward(struct ceph_mds_client *mdsc, if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { dout("forward tid %llu aborted, unregistering\n", tid); __unregister_request(mdsc, req); - } else if (fwd_seq <= req->r_num_fwd) { + } else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) { /* - * The type of 'num_fwd' in ceph 'MClientRequestForward' - * is 'int32_t', while in 'ceph_mds_request_head' the - * type is '__u8'. So in case the request bounces between - * MDSes exceeding 256 times, the client will get stuck. - * - * In this case it's ususally a bug in MDS and continue - * bouncing the request makes no sense. + * Avoid inifinite retrying after overflow. * - * In future this could be fixed in ceph code, so avoid - * using the hardcode here. + * The MDS will increase the fwd count and in client side + * if the num_fwd is less than the one saved in request + * that means the MDS is an old version and overflowed of + * 8 bits. */ - int max = sizeof_field(struct ceph_mds_request_head, num_fwd); - max = 1 << (max * BITS_PER_BYTE); - if (req->r_num_fwd >= max) { - mutex_lock(&req->r_fill_mutex); - req->r_err = -EMULTIHOP; - set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); - mutex_unlock(&req->r_fill_mutex); - aborted = true; - pr_warn_ratelimited("forward tid %llu seq overflow\n", - tid); - } else { - dout("forward tid %llu to mds%d - old seq %d <= %d\n", - tid, next_mds, req->r_num_fwd, fwd_seq); - } + mutex_lock(&req->r_fill_mutex); + req->r_err = -EMULTIHOP; + set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); + mutex_unlock(&req->r_fill_mutex); + aborted = true; + pr_warn_ratelimited("forward tid %llu seq overflow\n", tid); } else { /* resend. forward race not possible; mds would drop */ dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 1fa0f78b7b79..5a3714bdd64a 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -32,8 +32,9 @@ enum ceph_feature_type { CEPHFS_FEATURE_ALTERNATE_NAME, CEPHFS_FEATURE_NOTIFY_SESSION_STATE, CEPHFS_FEATURE_OP_GETVXATTR, + CEPHFS_FEATURE_32BITS_RETRY_FWD, - CEPHFS_FEATURE_MAX = CEPHFS_FEATURE_OP_GETVXATTR, + CEPHFS_FEATURE_MAX = CEPHFS_FEATURE_32BITS_RETRY_FWD, }; #define CEPHFS_FEATURES_CLIENT_SUPPORTED { \ @@ -47,6 +48,7 @@ enum ceph_feature_type { CEPHFS_FEATURE_ALTERNATE_NAME, \ CEPHFS_FEATURE_NOTIFY_SESSION_STATE, \ CEPHFS_FEATURE_OP_GETVXATTR, \ + CEPHFS_FEATURE_32BITS_RETRY_FWD, \ } /* diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index ae44812eb6d4..5f2301ee88bc 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -486,7 +486,7 @@ union ceph_mds_request_args_ext { #define CEPH_MDS_FLAG_WANT_DENTRY 2 /* want dentry in reply */ #define CEPH_MDS_FLAG_ASYNC 4 /* request is asynchronous */ -struct ceph_mds_request_head_old { +struct ceph_mds_request_head_legacy { __le64 oldest_client_tid; __le32 mdsmap_epoch; /* on client */ __le32 flags; /* CEPH_MDS_FLAG_* */ @@ -499,9 +499,9 @@ struct ceph_mds_request_head_old { union ceph_mds_request_args args; } __attribute__ ((packed)); -#define CEPH_MDS_REQUEST_HEAD_VERSION 1 +#define CEPH_MDS_REQUEST_HEAD_VERSION 2 -struct ceph_mds_request_head { +struct ceph_mds_request_head_old { __le16 version; /* struct version */ __le64 oldest_client_tid; __le32 mdsmap_epoch; /* on client */ @@ -515,6 +515,23 @@ struct ceph_mds_request_head { union ceph_mds_request_args_ext args; } __attribute__ ((packed)); +struct ceph_mds_request_head { + __le16 version; /* struct version */ + __le64 oldest_client_tid; + __le32 mdsmap_epoch; /* on client */ + __le32 flags; /* CEPH_MDS_FLAG_* */ + __u8 num_retry, num_fwd; /* legacy count retry and fwd attempts */ + __le16 num_releases; /* # include cap/lease release records */ + __le32 op; /* mds op code */ + __le32 caller_uid, caller_gid; + __le64 ino; /* use this ino for openc, mkdir, mknod, + etc. (if replaying) */ + union ceph_mds_request_args_ext args; + + __le32 ext_num_retry; /* new count retry attempts */ + __le32 ext_num_fwd; /* new count fwd attempts */ +} __attribute__ ((packed)); + /* cap/lease release record */ struct ceph_mds_request_release { __le64 ino, cap_id; /* ino and unique cap id */ -- cgit v1.2.3