diff options
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r-- | net/ceph/osd_client.c | 334 |
1 files changed, 324 insertions, 10 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 658a6f2320cf..d3a759e052c8 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -171,6 +171,13 @@ static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data, osd_data->num_bvecs = num_bvecs; } +static void ceph_osd_iter_init(struct ceph_osd_data *osd_data, + struct iov_iter *iter) +{ + osd_data->type = CEPH_OSD_DATA_TYPE_ITER; + osd_data->iter = *iter; +} + static struct ceph_osd_data * osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) { @@ -264,6 +271,22 @@ void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req, } EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos); +/** + * osd_req_op_extent_osd_iter - Set up an operation with an iterator buffer + * @osd_req: The request to set up + * @which: Index of the operation in which to set the iter + * @iter: The buffer iterator + */ +void osd_req_op_extent_osd_iter(struct ceph_osd_request *osd_req, + unsigned int which, struct iov_iter *iter) +{ + struct ceph_osd_data *osd_data; + + osd_data = osd_req_op_data(osd_req, which, extent, osd_data); + ceph_osd_iter_init(osd_data, iter); +} +EXPORT_SYMBOL(osd_req_op_extent_osd_iter); + static void osd_req_op_cls_request_info_pagelist( struct ceph_osd_request *osd_req, unsigned int which, struct ceph_pagelist *pagelist) @@ -346,6 +369,8 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) #endif /* CONFIG_BLOCK */ case CEPH_OSD_DATA_TYPE_BVECS: return osd_data->bvec_pos.iter.bi_size; + case CEPH_OSD_DATA_TYPE_ITER: + return iov_iter_count(&osd_data->iter); default: WARN(true, "unrecognized data type %d\n", (int)osd_data->type); return 0; @@ -376,8 +401,10 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, switch (op->op) { case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_SPARSE_READ: case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: + kfree(op->extent.sparse_ext); ceph_osd_data_release(&op->extent.osd_data); break; case CEPH_OSD_OP_CALL: @@ -669,6 +696,7 @@ static void get_num_data_items(struct ceph_osd_request *req, /* reply */ case CEPH_OSD_OP_STAT: case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_SPARSE_READ: case CEPH_OSD_OP_LIST_WATCHERS: *num_reply_data_items += 1; break; @@ -738,7 +766,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req, BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO && - opcode != CEPH_OSD_OP_TRUNCATE); + opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_SPARSE_READ); op->extent.offset = offset; op->extent.length = length; @@ -951,6 +979,8 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg, #endif } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) { ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos); + } else if (osd_data->type == CEPH_OSD_DATA_TYPE_ITER) { + ceph_msg_data_add_iter(msg, &osd_data->iter); } else { BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); } @@ -963,6 +993,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst, case CEPH_OSD_OP_STAT: break; case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_SPARSE_READ: case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: case CEPH_OSD_OP_ZERO: @@ -1017,6 +1048,10 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst, dst->copy_from.src_fadvise_flags = cpu_to_le32(src->copy_from.src_fadvise_flags); break; + case CEPH_OSD_OP_ASSERT_VER: + dst->assert_ver.unused = cpu_to_le64(0); + dst->assert_ver.ver = cpu_to_le64(src->assert_ver.ver); + break; default: pr_err("unsupported osd opcode %s\n", ceph_osd_op_name(src->op)); @@ -1059,7 +1094,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE && - opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE); + opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE && + opcode != CEPH_OSD_OP_SPARSE_READ); req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, GFP_NOFS); @@ -1100,15 +1136,30 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (flags & CEPH_OSD_FLAG_WRITE) req->r_data_offset = off; - if (num_ops > 1) + if (num_ops > 1) { + int num_req_ops, num_rep_ops; + /* - * This is a special case for ceph_writepages_start(), but it - * also covers ceph_uninline_data(). If more multi-op request - * use cases emerge, we will need a separate helper. + * If this is a multi-op write request, assume that we'll need + * request ops. If it's a multi-op read then assume we'll need + * reply ops. Anything else and call it -EINVAL. */ - r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_ops, 0); - else + if (flags & CEPH_OSD_FLAG_WRITE) { + num_req_ops = num_ops; + num_rep_ops = 0; + } else if (flags & CEPH_OSD_FLAG_READ) { + num_req_ops = 0; + num_rep_ops = num_ops; + } else { + r = -EINVAL; + goto fail; + } + + r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_req_ops, + num_rep_ops); + } else { r = ceph_osdc_alloc_messages(req, GFP_NOFS); + } if (r) goto fail; @@ -1120,6 +1171,18 @@ fail: } EXPORT_SYMBOL(ceph_osdc_new_request); +int __ceph_alloc_sparse_ext_map(struct ceph_osd_req_op *op, int cnt) +{ + op->extent.sparse_ext_cnt = cnt; + op->extent.sparse_ext = kmalloc_array(cnt, + sizeof(*op->extent.sparse_ext), + GFP_NOFS); + if (!op->extent.sparse_ext) + return -ENOMEM; + return 0; +} +EXPORT_SYMBOL(__ceph_alloc_sparse_ext_map); + /* * We keep osd requests in an rbtree, sorted by ->r_tid. */ @@ -1177,6 +1240,7 @@ static void osd_init(struct ceph_osd *osd) { refcount_set(&osd->o_ref, 1); RB_CLEAR_NODE(&osd->o_node); + spin_lock_init(&osd->o_requests_lock); osd->o_requests = RB_ROOT; osd->o_linger_requests = RB_ROOT; osd->o_backoff_mappings = RB_ROOT; @@ -1187,6 +1251,13 @@ static void osd_init(struct ceph_osd *osd) mutex_init(&osd->lock); } +static void ceph_init_sparse_read(struct ceph_sparse_read *sr) +{ + kfree(sr->sr_extent); + memset(sr, '\0', sizeof(*sr)); + sr->sr_state = CEPH_SPARSE_READ_HDR; +} + static void osd_cleanup(struct ceph_osd *osd) { WARN_ON(!RB_EMPTY_NODE(&osd->o_node)); @@ -1197,6 +1268,8 @@ static void osd_cleanup(struct ceph_osd *osd) WARN_ON(!list_empty(&osd->o_osd_lru)); WARN_ON(!list_empty(&osd->o_keepalive_item)); + ceph_init_sparse_read(&osd->o_sparse_read); + if (osd->o_auth.authorizer) { WARN_ON(osd_homeless(osd)); ceph_auth_destroy_authorizer(osd->o_auth.authorizer); @@ -1216,6 +1289,9 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) osd_init(osd); osd->o_osdc = osdc; osd->o_osd = onum; + osd->o_sparse_op_idx = -1; + + ceph_init_sparse_read(&osd->o_sparse_read); ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); @@ -1406,7 +1482,9 @@ static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req) atomic_inc(&osd->o_osdc->num_homeless); get_osd(osd); + spin_lock(&osd->o_requests_lock); insert_request(&osd->o_requests, req); + spin_unlock(&osd->o_requests_lock); req->r_osd = osd; } @@ -1418,7 +1496,9 @@ static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req) req, req->r_tid); req->r_osd = NULL; + spin_lock(&osd->o_requests_lock); erase_request(&osd->o_requests, req); + spin_unlock(&osd->o_requests_lock); put_osd(osd); if (!osd_homeless(osd)) @@ -2016,6 +2096,7 @@ static void setup_request_data(struct ceph_osd_request *req) &op->raw_data_in); break; case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_SPARSE_READ: ceph_osdc_msg_data_add(reply_msg, &op->extent.osd_data); break; @@ -2435,8 +2516,10 @@ static void finish_request(struct ceph_osd_request *req) req->r_end_latency = ktime_get(); - if (req->r_osd) + if (req->r_osd) { + ceph_init_sparse_read(&req->r_osd->o_sparse_read); unlink_request(req->r_osd, req); + } atomic_dec(&osdc->num_requests); /* @@ -3795,6 +3878,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) * one (type of) reply back. */ WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK)); + req->r_version = m.user_version; req->r_result = m.result ?: data_len; finish_request(req); mutex_unlock(&osd->lock); @@ -5348,6 +5432,24 @@ static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg) ceph_msg_put(msg); } +/* How much sparse data was requested? */ +static u64 sparse_data_requested(struct ceph_osd_request *req) +{ + u64 len = 0; + + if (req->r_flags & CEPH_OSD_FLAG_READ) { + int i; + + for (i = 0; i < req->r_num_ops; ++i) { + struct ceph_osd_req_op *op = &req->r_ops[i]; + + if (op->op == CEPH_OSD_OP_SPARSE_READ) + len += op->extent.length; + } + } + return len; +} + /* * Lookup and return message for incoming reply. Don't try to do * anything about a larger than preallocated data portion of the @@ -5364,6 +5466,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, int front_len = le32_to_cpu(hdr->front_len); int data_len = le32_to_cpu(hdr->data_len); u64 tid = le64_to_cpu(hdr->tid); + u64 srlen; down_read(&osdc->lock); if (!osd_registered(osd)) { @@ -5396,7 +5499,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, req->r_reply = m; } - if (data_len > req->r_reply->data_length) { + srlen = sparse_data_requested(req); + if (!srlen && data_len > req->r_reply->data_length) { pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n", __func__, osd->o_osd, req->r_tid, data_len, req->r_reply->data_length); @@ -5406,6 +5510,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, } m = ceph_msg_get(req->r_reply); + m->sparse_read = (bool)srlen; + dout("get_reply tid %lld %p\n", tid, m); out_unlock_session: @@ -5638,9 +5744,217 @@ static int osd_check_message_signature(struct ceph_msg *msg) return ceph_auth_check_message_signature(auth, msg); } +static void advance_cursor(struct ceph_msg_data_cursor *cursor, size_t len, + bool zero) +{ + while (len) { + struct page *page; + size_t poff, plen; + + page = ceph_msg_data_next(cursor, &poff, &plen); + if (plen > len) + plen = len; + if (zero) + zero_user_segment(page, poff, poff + plen); + len -= plen; + ceph_msg_data_advance(cursor, plen); + } +} + +static int prep_next_sparse_read(struct ceph_connection *con, + struct ceph_msg_data_cursor *cursor) +{ + struct ceph_osd *o = con->private; + struct ceph_sparse_read *sr = &o->o_sparse_read; + struct ceph_osd_request *req; + struct ceph_osd_req_op *op; + + spin_lock(&o->o_requests_lock); + req = lookup_request(&o->o_requests, le64_to_cpu(con->in_msg->hdr.tid)); + if (!req) { + spin_unlock(&o->o_requests_lock); + return -EBADR; + } + + if (o->o_sparse_op_idx < 0) { + u64 srlen = sparse_data_requested(req); + + dout("%s: [%d] starting new sparse read req. srlen=0x%llx\n", + __func__, o->o_osd, srlen); + ceph_msg_data_cursor_init(cursor, con->in_msg, srlen); + } else { + u64 end; + + op = &req->r_ops[o->o_sparse_op_idx]; + + WARN_ON_ONCE(op->extent.sparse_ext); + + /* hand back buffer we took earlier */ + op->extent.sparse_ext = sr->sr_extent; + sr->sr_extent = NULL; + op->extent.sparse_ext_cnt = sr->sr_count; + sr->sr_ext_len = 0; + dout("%s: [%d] completed extent array len %d cursor->resid %zd\n", + __func__, o->o_osd, op->extent.sparse_ext_cnt, cursor->resid); + /* Advance to end of data for this operation */ + end = ceph_sparse_ext_map_end(op); + if (end < sr->sr_req_len) + advance_cursor(cursor, sr->sr_req_len - end, false); + } + + ceph_init_sparse_read(sr); + + /* find next op in this request (if any) */ + while (++o->o_sparse_op_idx < req->r_num_ops) { + op = &req->r_ops[o->o_sparse_op_idx]; + if (op->op == CEPH_OSD_OP_SPARSE_READ) + goto found; + } + + /* reset for next sparse read request */ + spin_unlock(&o->o_requests_lock); + o->o_sparse_op_idx = -1; + return 0; +found: + sr->sr_req_off = op->extent.offset; + sr->sr_req_len = op->extent.length; + sr->sr_pos = sr->sr_req_off; + dout("%s: [%d] new sparse read op at idx %d 0x%llx~0x%llx\n", __func__, + o->o_osd, o->o_sparse_op_idx, sr->sr_req_off, sr->sr_req_len); + + /* hand off request's sparse extent map buffer */ + sr->sr_ext_len = op->extent.sparse_ext_cnt; + op->extent.sparse_ext_cnt = 0; + sr->sr_extent = op->extent.sparse_ext; + op->extent.sparse_ext = NULL; + + spin_unlock(&o->o_requests_lock); + return 1; +} + +#ifdef __BIG_ENDIAN +static inline void convert_extent_map(struct ceph_sparse_read *sr) +{ + int i; + + for (i = 0; i < sr->sr_count; i++) { + struct ceph_sparse_extent *ext = &sr->sr_extent[i]; + + ext->off = le64_to_cpu((__force __le64)ext->off); + ext->len = le64_to_cpu((__force __le64)ext->len); + } +} +#else +static inline void convert_extent_map(struct ceph_sparse_read *sr) +{ +} +#endif + +#define MAX_EXTENTS 4096 + +static int osd_sparse_read(struct ceph_connection *con, + struct ceph_msg_data_cursor *cursor, + char **pbuf) +{ + struct ceph_osd *o = con->private; + struct ceph_sparse_read *sr = &o->o_sparse_read; + u32 count = sr->sr_count; + u64 eoff, elen; + int ret; + + switch (sr->sr_state) { + case CEPH_SPARSE_READ_HDR: +next_op: + ret = prep_next_sparse_read(con, cursor); + if (ret <= 0) + return ret; + + /* number of extents */ + ret = sizeof(sr->sr_count); + *pbuf = (char *)&sr->sr_count; + sr->sr_state = CEPH_SPARSE_READ_EXTENTS; + break; + case CEPH_SPARSE_READ_EXTENTS: + /* Convert sr_count to host-endian */ + count = le32_to_cpu((__force __le32)sr->sr_count); + sr->sr_count = count; + dout("[%d] got %u extents\n", o->o_osd, count); + + if (count > 0) { + if (!sr->sr_extent || count > sr->sr_ext_len) { + /* + * Apply a hard cap to the number of extents. + * If we have more, assume something is wrong. + */ + if (count > MAX_EXTENTS) { + dout("%s: OSD returned 0x%x extents in a single reply!\n", + __func__, count); + return -EREMOTEIO; + } + + /* no extent array provided, or too short */ + kfree(sr->sr_extent); + sr->sr_extent = kmalloc_array(count, + sizeof(*sr->sr_extent), + GFP_NOIO); + if (!sr->sr_extent) + return -ENOMEM; + sr->sr_ext_len = count; + } + ret = count * sizeof(*sr->sr_extent); + *pbuf = (char *)sr->sr_extent; + sr->sr_state = CEPH_SPARSE_READ_DATA_LEN; + break; + } + /* No extents? Read data len */ + fallthrough; + case CEPH_SPARSE_READ_DATA_LEN: + convert_extent_map(sr); + ret = sizeof(sr->sr_datalen); + *pbuf = (char *)&sr->sr_datalen; + sr->sr_state = CEPH_SPARSE_READ_DATA; + break; + case CEPH_SPARSE_READ_DATA: + if (sr->sr_index >= count) { + sr->sr_state = CEPH_SPARSE_READ_HDR; + goto next_op; + } + + eoff = sr->sr_extent[sr->sr_index].off; + elen = sr->sr_extent[sr->sr_index].len; + + dout("[%d] ext %d off 0x%llx len 0x%llx\n", + o->o_osd, sr->sr_index, eoff, elen); + + if (elen > INT_MAX) { + dout("Sparse read extent length too long (0x%llx)\n", + elen); + return -EREMOTEIO; + } + + /* zero out anything from sr_pos to start of extent */ + if (sr->sr_pos < eoff) + advance_cursor(cursor, eoff - sr->sr_pos, true); + + /* Set position to end of extent */ + sr->sr_pos = eoff + elen; + + /* send back the new length and nullify the ptr */ + cursor->sr_resid = elen; + ret = elen; + *pbuf = NULL; + + /* Bump the array index */ + ++sr->sr_index; + break; + } + return ret; +} + static const struct ceph_connection_operations osd_con_ops = { .get = osd_get_con, .put = osd_put_con, + .sparse_read = osd_sparse_read, .alloc_msg = osd_alloc_msg, .dispatch = osd_dispatch, .fault = osd_fault, |