summaryrefslogtreecommitdiff
path: root/net/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/messenger.c78
-rw-r--r--net/ceph/messenger_v1.c98
-rw-r--r--net/ceph/messenger_v2.c289
-rw-r--r--net/ceph/osd_client.c334
4 files changed, 761 insertions, 38 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 5eb4898cccd4..10a41cd9c523 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -969,6 +969,62 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
return true;
}
+static void ceph_msg_data_iter_cursor_init(struct ceph_msg_data_cursor *cursor,
+ size_t length)
+{
+ struct ceph_msg_data *data = cursor->data;
+
+ cursor->iov_iter = data->iter;
+ cursor->lastlen = 0;
+ iov_iter_truncate(&cursor->iov_iter, length);
+ cursor->resid = iov_iter_count(&cursor->iov_iter);
+}
+
+static struct page *ceph_msg_data_iter_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length)
+{
+ struct page *page;
+ ssize_t len;
+
+ if (cursor->lastlen)
+ iov_iter_revert(&cursor->iov_iter, cursor->lastlen);
+
+ len = iov_iter_get_pages2(&cursor->iov_iter, &page, PAGE_SIZE,
+ 1, page_offset);
+ BUG_ON(len < 0);
+
+ cursor->lastlen = len;
+
+ /*
+ * FIXME: The assumption is that the pages represented by the iov_iter
+ * are pinned, with the references held by the upper-level
+ * callers, or by virtue of being under writeback. Eventually,
+ * we'll get an iov_iter_get_pages2 variant that doesn't take
+ * page refs. Until then, just put the page ref.
+ */
+ VM_BUG_ON_PAGE(!PageWriteback(page) && page_count(page) < 2, page);
+ put_page(page);
+
+ *length = min_t(size_t, len, cursor->resid);
+ return page;
+}
+
+static bool ceph_msg_data_iter_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
+{
+ BUG_ON(bytes > cursor->resid);
+ cursor->resid -= bytes;
+
+ if (bytes < cursor->lastlen) {
+ cursor->lastlen -= bytes;
+ } else {
+ iov_iter_advance(&cursor->iov_iter, bytes - cursor->lastlen);
+ cursor->lastlen = 0;
+ }
+
+ return cursor->resid;
+}
+
/*
* Message data is handled (sent or received) in pieces, where each
* piece resides on a single page. The network layer might not
@@ -996,6 +1052,9 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
case CEPH_MSG_DATA_BVECS:
ceph_msg_data_bvecs_cursor_init(cursor, length);
break;
+ case CEPH_MSG_DATA_ITER:
+ ceph_msg_data_iter_cursor_init(cursor, length);
+ break;
case CEPH_MSG_DATA_NONE:
default:
/* BUG(); */
@@ -1013,6 +1072,7 @@ void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor,
cursor->total_resid = length;
cursor->data = msg->data;
+ cursor->sr_resid = 0;
__ceph_msg_data_cursor_init(cursor);
}
@@ -1042,6 +1102,9 @@ struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
case CEPH_MSG_DATA_BVECS:
page = ceph_msg_data_bvecs_next(cursor, page_offset, length);
break;
+ case CEPH_MSG_DATA_ITER:
+ page = ceph_msg_data_iter_next(cursor, page_offset, length);
+ break;
case CEPH_MSG_DATA_NONE:
default:
page = NULL;
@@ -1080,6 +1143,9 @@ void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes)
case CEPH_MSG_DATA_BVECS:
new_piece = ceph_msg_data_bvecs_advance(cursor, bytes);
break;
+ case CEPH_MSG_DATA_ITER:
+ new_piece = ceph_msg_data_iter_advance(cursor, bytes);
+ break;
case CEPH_MSG_DATA_NONE:
default:
BUG();
@@ -1879,6 +1945,18 @@ void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
}
EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
+void ceph_msg_data_add_iter(struct ceph_msg *msg,
+ struct iov_iter *iter)
+{
+ struct ceph_msg_data *data;
+
+ data = ceph_msg_data_add(msg);
+ data->type = CEPH_MSG_DATA_ITER;
+ data->iter = *iter;
+
+ msg->data_length += iov_iter_count(&data->iter);
+}
+
/*
* construct a new message with given type, size
* the new msg has a ref count of 1.
diff --git a/net/ceph/messenger_v1.c b/net/ceph/messenger_v1.c
index 3d57bb48a2b4..f9a50d7f0d20 100644
--- a/net/ceph/messenger_v1.c
+++ b/net/ceph/messenger_v1.c
@@ -159,9 +159,9 @@ static size_t sizeof_footer(struct ceph_connection *con)
static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
{
- /* Initialize data cursor */
-
- ceph_msg_data_cursor_init(&msg->cursor, msg, data_len);
+ /* Initialize data cursor if it's not a sparse read */
+ if (!msg->sparse_read)
+ ceph_msg_data_cursor_init(&msg->cursor, msg, data_len);
}
/*
@@ -960,9 +960,9 @@ static void process_ack(struct ceph_connection *con)
prepare_read_tag(con);
}
-static int read_partial_message_section(struct ceph_connection *con,
- struct kvec *section,
- unsigned int sec_len, u32 *crc)
+static int read_partial_message_chunk(struct ceph_connection *con,
+ struct kvec *section,
+ unsigned int sec_len, u32 *crc)
{
int ret, left;
@@ -978,11 +978,91 @@ static int read_partial_message_section(struct ceph_connection *con,
section->iov_len += ret;
}
if (section->iov_len == sec_len)
- *crc = crc32c(0, section->iov_base, section->iov_len);
+ *crc = crc32c(*crc, section->iov_base, section->iov_len);
return 1;
}
+static inline int read_partial_message_section(struct ceph_connection *con,
+ struct kvec *section,
+ unsigned int sec_len, u32 *crc)
+{
+ *crc = 0;
+ return read_partial_message_chunk(con, section, sec_len, crc);
+}
+
+static int read_sparse_msg_extent(struct ceph_connection *con, u32 *crc)
+{
+ struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
+ bool do_bounce = ceph_test_opt(from_msgr(con->msgr), RXBOUNCE);
+
+ if (do_bounce && unlikely(!con->bounce_page)) {
+ con->bounce_page = alloc_page(GFP_NOIO);
+ if (!con->bounce_page) {
+ pr_err("failed to allocate bounce page\n");
+ return -ENOMEM;
+ }
+ }
+
+ while (cursor->sr_resid > 0) {
+ struct page *page, *rpage;
+ size_t off, len;
+ int ret;
+
+ page = ceph_msg_data_next(cursor, &off, &len);
+ rpage = do_bounce ? con->bounce_page : page;
+
+ /* clamp to what remains in extent */
+ len = min_t(int, len, cursor->sr_resid);
+ ret = ceph_tcp_recvpage(con->sock, rpage, (int)off, len);
+ if (ret <= 0)
+ return ret;
+ *crc = ceph_crc32c_page(*crc, rpage, off, ret);
+ ceph_msg_data_advance(cursor, (size_t)ret);
+ cursor->sr_resid -= ret;
+ if (do_bounce)
+ memcpy_page(page, off, rpage, off, ret);
+ }
+ return 1;
+}
+
+static int read_sparse_msg_data(struct ceph_connection *con)
+{
+ struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
+ bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
+ u32 crc = 0;
+ int ret = 1;
+
+ if (do_datacrc)
+ crc = con->in_data_crc;
+
+ do {
+ if (con->v1.in_sr_kvec.iov_base)
+ ret = read_partial_message_chunk(con,
+ &con->v1.in_sr_kvec,
+ con->v1.in_sr_len,
+ &crc);
+ else if (cursor->sr_resid > 0)
+ ret = read_sparse_msg_extent(con, &crc);
+
+ if (ret <= 0) {
+ if (do_datacrc)
+ con->in_data_crc = crc;
+ return ret;
+ }
+
+ memset(&con->v1.in_sr_kvec, 0, sizeof(con->v1.in_sr_kvec));
+ ret = con->ops->sparse_read(con, cursor,
+ (char **)&con->v1.in_sr_kvec.iov_base);
+ con->v1.in_sr_len = ret;
+ } while (ret > 0);
+
+ if (do_datacrc)
+ con->in_data_crc = crc;
+
+ return ret < 0 ? ret : 1; /* must return > 0 to indicate success */
+}
+
static int read_partial_msg_data(struct ceph_connection *con)
{
struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
@@ -1173,7 +1253,9 @@ static int read_partial_message(struct ceph_connection *con)
if (!m->num_data_items)
return -EIO;
- if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
+ if (m->sparse_read)
+ ret = read_sparse_msg_data(con);
+ else if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
ret = read_partial_msg_data_bounce(con);
else
ret = read_partial_msg_data(con);
diff --git a/net/ceph/messenger_v2.c b/net/ceph/messenger_v2.c
index 1df1d29dee92..d09a39ff2cf0 100644
--- a/net/ceph/messenger_v2.c
+++ b/net/ceph/messenger_v2.c
@@ -8,9 +8,9 @@
#include <linux/ceph/ceph_debug.h>
#include <crypto/aead.h>
-#include <crypto/algapi.h> /* for crypto_memneq() */
#include <crypto/hash.h>
#include <crypto/sha2.h>
+#include <crypto/utils.h>
#include <linux/bvec.h>
#include <linux/crc32c.h>
#include <linux/net.h>
@@ -52,14 +52,16 @@
#define FRAME_LATE_STATUS_COMPLETE 0xe
#define FRAME_LATE_STATUS_ABORTED_MASK 0xf
-#define IN_S_HANDLE_PREAMBLE 1
-#define IN_S_HANDLE_CONTROL 2
-#define IN_S_HANDLE_CONTROL_REMAINDER 3
-#define IN_S_PREPARE_READ_DATA 4
-#define IN_S_PREPARE_READ_DATA_CONT 5
-#define IN_S_PREPARE_READ_ENC_PAGE 6
-#define IN_S_HANDLE_EPILOGUE 7
-#define IN_S_FINISH_SKIP 8
+#define IN_S_HANDLE_PREAMBLE 1
+#define IN_S_HANDLE_CONTROL 2
+#define IN_S_HANDLE_CONTROL_REMAINDER 3
+#define IN_S_PREPARE_READ_DATA 4
+#define IN_S_PREPARE_READ_DATA_CONT 5
+#define IN_S_PREPARE_READ_ENC_PAGE 6
+#define IN_S_PREPARE_SPARSE_DATA 7
+#define IN_S_PREPARE_SPARSE_DATA_CONT 8
+#define IN_S_HANDLE_EPILOGUE 9
+#define IN_S_FINISH_SKIP 10
#define OUT_S_QUEUE_DATA 1
#define OUT_S_QUEUE_DATA_CONT 2
@@ -967,12 +969,48 @@ static void init_sgs_cursor(struct scatterlist **sg,
}
}
+/**
+ * init_sgs_pages: set up scatterlist on an array of page pointers
+ * @sg: scatterlist to populate
+ * @pages: pointer to page array
+ * @dpos: position in the array to start (bytes)
+ * @dlen: len to add to sg (bytes)
+ * @pad: pointer to pad destination (if any)
+ *
+ * Populate the scatterlist from the page array, starting at an arbitrary
+ * byte in the array and running for a specified length.
+ */
+static void init_sgs_pages(struct scatterlist **sg, struct page **pages,
+ int dpos, int dlen, u8 *pad)
+{
+ int idx = dpos >> PAGE_SHIFT;
+ int off = offset_in_page(dpos);
+ int resid = dlen;
+
+ do {
+ int len = min(resid, (int)PAGE_SIZE - off);
+
+ sg_set_page(*sg, pages[idx], len, off);
+ *sg = sg_next(*sg);
+ off = 0;
+ ++idx;
+ resid -= len;
+ } while (resid);
+
+ if (need_padding(dlen)) {
+ sg_set_buf(*sg, pad, padding_len(dlen));
+ *sg = sg_next(*sg);
+ }
+}
+
static int setup_message_sgs(struct sg_table *sgt, struct ceph_msg *msg,
u8 *front_pad, u8 *middle_pad, u8 *data_pad,
- void *epilogue, bool add_tag)
+ void *epilogue, struct page **pages, int dpos,
+ bool add_tag)
{
struct ceph_msg_data_cursor cursor;
struct scatterlist *cur_sg;
+ int dlen = data_len(msg);
int sg_cnt;
int ret;
@@ -986,9 +1024,15 @@ static int setup_message_sgs(struct sg_table *sgt, struct ceph_msg *msg,
if (middle_len(msg))
sg_cnt += calc_sg_cnt(msg->middle->vec.iov_base,
middle_len(msg));
- if (data_len(msg)) {
- ceph_msg_data_cursor_init(&cursor, msg, data_len(msg));
- sg_cnt += calc_sg_cnt_cursor(&cursor);
+ if (dlen) {
+ if (pages) {
+ sg_cnt += calc_pages_for(dpos, dlen);
+ if (need_padding(dlen))
+ sg_cnt++;
+ } else {
+ ceph_msg_data_cursor_init(&cursor, msg, dlen);
+ sg_cnt += calc_sg_cnt_cursor(&cursor);
+ }
}
ret = sg_alloc_table(sgt, sg_cnt, GFP_NOIO);
@@ -1002,9 +1046,13 @@ static int setup_message_sgs(struct sg_table *sgt, struct ceph_msg *msg,
if (middle_len(msg))
init_sgs(&cur_sg, msg->middle->vec.iov_base, middle_len(msg),
middle_pad);
- if (data_len(msg)) {
- ceph_msg_data_cursor_init(&cursor, msg, data_len(msg));
- init_sgs_cursor(&cur_sg, &cursor, data_pad);
+ if (dlen) {
+ if (pages) {
+ init_sgs_pages(&cur_sg, pages, dpos, dlen, data_pad);
+ } else {
+ ceph_msg_data_cursor_init(&cursor, msg, dlen);
+ init_sgs_cursor(&cur_sg, &cursor, data_pad);
+ }
}
WARN_ON(!sg_is_last(cur_sg));
@@ -1039,10 +1087,53 @@ static int decrypt_control_remainder(struct ceph_connection *con)
padded_len(rem_len) + CEPH_GCM_TAG_LEN);
}
+/* Process sparse read data that lives in a buffer */
+static int process_v2_sparse_read(struct ceph_connection *con,
+ struct page **pages, int spos)
+{
+ struct ceph_msg_data_cursor *cursor = &con->v2.in_cursor;
+ int ret;
+
+ for (;;) {
+ char *buf = NULL;
+
+ ret = con->ops->sparse_read(con, cursor, &buf);
+ if (ret <= 0)
+ return ret;
+
+ dout("%s: sparse_read return %x buf %p\n", __func__, ret, buf);
+
+ do {
+ int idx = spos >> PAGE_SHIFT;
+ int soff = offset_in_page(spos);
+ struct page *spage = con->v2.in_enc_pages[idx];
+ int len = min_t(int, ret, PAGE_SIZE - soff);
+
+ if (buf) {
+ memcpy_from_page(buf, spage, soff, len);
+ buf += len;
+ } else {
+ struct bio_vec bv;
+
+ get_bvec_at(cursor, &bv);
+ len = min_t(int, len, bv.bv_len);
+ memcpy_page(bv.bv_page, bv.bv_offset,
+ spage, soff, len);
+ ceph_msg_data_advance(cursor, len);
+ }
+ spos += len;
+ ret -= len;
+ } while (ret);
+ }
+}
+
static int decrypt_tail(struct ceph_connection *con)
{
struct sg_table enc_sgt = {};
struct sg_table sgt = {};
+ struct page **pages = NULL;
+ bool sparse = con->in_msg->sparse_read;
+ int dpos = 0;
int tail_len;
int ret;
@@ -1053,9 +1144,14 @@ static int decrypt_tail(struct ceph_connection *con)
if (ret)
goto out;
+ if (sparse) {
+ dpos = padded_len(front_len(con->in_msg) + padded_len(middle_len(con->in_msg)));
+ pages = con->v2.in_enc_pages;
+ }
+
ret = setup_message_sgs(&sgt, con->in_msg, FRONT_PAD(con->v2.in_buf),
- MIDDLE_PAD(con->v2.in_buf), DATA_PAD(con->v2.in_buf),
- con->v2.in_buf, true);
+ MIDDLE_PAD(con->v2.in_buf), DATA_PAD(con->v2.in_buf),
+ con->v2.in_buf, pages, dpos, true);
if (ret)
goto out;
@@ -1065,6 +1161,12 @@ static int decrypt_tail(struct ceph_connection *con)
if (ret)
goto out;
+ if (sparse && data_len(con->in_msg)) {
+ ret = process_v2_sparse_read(con, con->v2.in_enc_pages, dpos);
+ if (ret)
+ goto out;
+ }
+
WARN_ON(!con->v2.in_enc_page_cnt);
ceph_release_page_vector(con->v2.in_enc_pages,
con->v2.in_enc_page_cnt);
@@ -1588,7 +1690,7 @@ static int prepare_message_secure(struct ceph_connection *con)
encode_epilogue_secure(con, false);
ret = setup_message_sgs(&sgt, con->out_msg, zerop, zerop, zerop,
- &con->v2.out_epil, false);
+ &con->v2.out_epil, NULL, 0, false);
if (ret)
goto out;
@@ -1825,6 +1927,123 @@ static void prepare_read_data_cont(struct ceph_connection *con)
con->v2.in_state = IN_S_HANDLE_EPILOGUE;
}
+static int prepare_sparse_read_cont(struct ceph_connection *con)
+{
+ int ret;
+ struct bio_vec bv;
+ char *buf = NULL;
+ struct ceph_msg_data_cursor *cursor = &con->v2.in_cursor;
+
+ WARN_ON(con->v2.in_state != IN_S_PREPARE_SPARSE_DATA_CONT);
+
+ if (iov_iter_is_bvec(&con->v2.in_iter)) {
+ if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
+ con->in_data_crc = crc32c(con->in_data_crc,
+ page_address(con->bounce_page),
+ con->v2.in_bvec.bv_len);
+ get_bvec_at(cursor, &bv);
+ memcpy_to_page(bv.bv_page, bv.bv_offset,
+ page_address(con->bounce_page),
+ con->v2.in_bvec.bv_len);
+ } else {
+ con->in_data_crc = ceph_crc32c_page(con->in_data_crc,
+ con->v2.in_bvec.bv_page,
+ con->v2.in_bvec.bv_offset,
+ con->v2.in_bvec.bv_len);
+ }
+
+ ceph_msg_data_advance(cursor, con->v2.in_bvec.bv_len);
+ cursor->sr_resid -= con->v2.in_bvec.bv_len;
+ dout("%s: advance by 0x%x sr_resid 0x%x\n", __func__,
+ con->v2.in_bvec.bv_len, cursor->sr_resid);
+ WARN_ON_ONCE(cursor->sr_resid > cursor->total_resid);
+ if (cursor->sr_resid) {
+ get_bvec_at(cursor, &bv);
+ if (bv.bv_len > cursor->sr_resid)
+ bv.bv_len = cursor->sr_resid;
+ if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
+ bv.bv_page = con->bounce_page;
+ bv.bv_offset = 0;
+ }
+ set_in_bvec(con, &bv);
+ con->v2.data_len_remain -= bv.bv_len;
+ return 0;
+ }
+ } else if (iov_iter_is_kvec(&con->v2.in_iter)) {
+ /* On first call, we have no kvec so don't compute crc */
+ if (con->v2.in_kvec_cnt) {
+ WARN_ON_ONCE(con->v2.in_kvec_cnt > 1);
+ con->in_data_crc = crc32c(con->in_data_crc,
+ con->v2.in_kvecs[0].iov_base,
+ con->v2.in_kvecs[0].iov_len);
+ }
+ } else {
+ return -EIO;
+ }
+
+ /* get next extent */
+ ret = con->ops->sparse_read(con, cursor, &buf);
+ if (ret <= 0) {
+ if (ret < 0)
+ return ret;
+
+ reset_in_kvecs(con);
+ add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN);
+ con->v2.in_state = IN_S_HANDLE_EPILOGUE;
+ return 0;
+ }
+
+ if (buf) {
+ /* receive into buffer */
+ reset_in_kvecs(con);
+ add_in_kvec(con, buf, ret);
+ con->v2.data_len_remain -= ret;
+ return 0;
+ }
+
+ if (ret > cursor->total_resid) {
+ pr_warn("%s: ret 0x%x total_resid 0x%zx resid 0x%zx\n",
+ __func__, ret, cursor->total_resid, cursor->resid);
+ return -EIO;
+ }
+ get_bvec_at(cursor, &bv);
+ if (bv.bv_len > cursor->sr_resid)
+ bv.bv_len = cursor->sr_resid;
+ if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
+ if (unlikely(!con->bounce_page)) {
+ con->bounce_page = alloc_page(GFP_NOIO);
+ if (!con->bounce_page) {
+ pr_err("failed to allocate bounce page\n");
+ return -ENOMEM;
+ }
+ }
+
+ bv.bv_page = con->bounce_page;
+ bv.bv_offset = 0;
+ }
+ set_in_bvec(con, &bv);
+ con->v2.data_len_remain -= ret;
+ return ret;
+}
+
+static int prepare_sparse_read_data(struct ceph_connection *con)
+{
+ struct ceph_msg *msg = con->in_msg;
+
+ dout("%s: starting sparse read\n", __func__);
+
+ if (WARN_ON_ONCE(!con->ops->sparse_read))
+ return -EOPNOTSUPP;
+
+ if (!con_secure(con))
+ con->in_data_crc = -1;
+
+ reset_in_kvecs(con);
+ con->v2.in_state = IN_S_PREPARE_SPARSE_DATA_CONT;
+ con->v2.data_len_remain = data_len(msg);
+ return prepare_sparse_read_cont(con);
+}
+
static int prepare_read_tail_plain(struct ceph_connection *con)
{
struct ceph_msg *msg = con->in_msg;
@@ -1845,7 +2064,10 @@ static int prepare_read_tail_plain(struct ceph_connection *con)
}
if (data_len(msg)) {
- con->v2.in_state = IN_S_PREPARE_READ_DATA;
+ if (msg->sparse_read)
+ con->v2.in_state = IN_S_PREPARE_SPARSE_DATA;
+ else
+ con->v2.in_state = IN_S_PREPARE_READ_DATA;
} else {
add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN);
con->v2.in_state = IN_S_HANDLE_EPILOGUE;
@@ -2898,6 +3120,12 @@ static int populate_in_iter(struct ceph_connection *con)
prepare_read_enc_page(con);
ret = 0;
break;
+ case IN_S_PREPARE_SPARSE_DATA:
+ ret = prepare_sparse_read_data(con);
+ break;
+ case IN_S_PREPARE_SPARSE_DATA_CONT:
+ ret = prepare_sparse_read_cont(con);
+ break;
case IN_S_HANDLE_EPILOGUE:
ret = handle_epilogue(con);
break;
@@ -3489,6 +3717,23 @@ static void revoke_at_prepare_read_enc_page(struct ceph_connection *con)
con->v2.in_state = IN_S_FINISH_SKIP;
}
+static void revoke_at_prepare_sparse_data(struct ceph_connection *con)
+{
+ int resid; /* current piece of data */
+ int remaining;
+
+ WARN_ON(con_secure(con));
+ WARN_ON(!data_len(con->in_msg));
+ WARN_ON(!iov_iter_is_bvec(&con->v2.in_iter));
+ resid = iov_iter_count(&con->v2.in_iter);
+ dout("%s con %p resid %d\n", __func__, con, resid);
+
+ remaining = CEPH_EPILOGUE_PLAIN_LEN + con->v2.data_len_remain;
+ con->v2.in_iter.count -= resid;
+ set_in_skip(con, resid + remaining);
+ con->v2.in_state = IN_S_FINISH_SKIP;
+}
+
static void revoke_at_handle_epilogue(struct ceph_connection *con)
{
int resid;
@@ -3505,6 +3750,7 @@ static void revoke_at_handle_epilogue(struct ceph_connection *con)
void ceph_con_v2_revoke_incoming(struct ceph_connection *con)
{
switch (con->v2.in_state) {
+ case IN_S_PREPARE_SPARSE_DATA:
case IN_S_PREPARE_READ_DATA:
revoke_at_prepare_read_data(con);
break;
@@ -3514,6 +3760,9 @@ void ceph_con_v2_revoke_incoming(struct ceph_connection *con)
case IN_S_PREPARE_READ_ENC_PAGE:
revoke_at_prepare_read_enc_page(con);
break;
+ case IN_S_PREPARE_SPARSE_DATA_CONT:
+ revoke_at_prepare_sparse_data(con);
+ break;
case IN_S_HANDLE_EPILOGUE:
revoke_at_handle_epilogue(con);
break;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 658a6f2320cf..d3a759e052c8 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -171,6 +171,13 @@ static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
osd_data->num_bvecs = num_bvecs;
}
+static void ceph_osd_iter_init(struct ceph_osd_data *osd_data,
+ struct iov_iter *iter)
+{
+ osd_data->type = CEPH_OSD_DATA_TYPE_ITER;
+ osd_data->iter = *iter;
+}
+
static struct ceph_osd_data *
osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
{
@@ -264,6 +271,22 @@ void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
}
EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);
+/**
+ * osd_req_op_extent_osd_iter - Set up an operation with an iterator buffer
+ * @osd_req: The request to set up
+ * @which: Index of the operation in which to set the iter
+ * @iter: The buffer iterator
+ */
+void osd_req_op_extent_osd_iter(struct ceph_osd_request *osd_req,
+ unsigned int which, struct iov_iter *iter)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+ ceph_osd_iter_init(osd_data, iter);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_iter);
+
static void osd_req_op_cls_request_info_pagelist(
struct ceph_osd_request *osd_req,
unsigned int which, struct ceph_pagelist *pagelist)
@@ -346,6 +369,8 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
#endif /* CONFIG_BLOCK */
case CEPH_OSD_DATA_TYPE_BVECS:
return osd_data->bvec_pos.iter.bi_size;
+ case CEPH_OSD_DATA_TYPE_ITER:
+ return iov_iter_count(&osd_data->iter);
default:
WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
return 0;
@@ -376,8 +401,10 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
switch (op->op) {
case CEPH_OSD_OP_READ:
+ case CEPH_OSD_OP_SPARSE_READ:
case CEPH_OSD_OP_WRITE:
case CEPH_OSD_OP_WRITEFULL:
+ kfree(op->extent.sparse_ext);
ceph_osd_data_release(&op->extent.osd_data);
break;
case CEPH_OSD_OP_CALL:
@@ -669,6 +696,7 @@ static void get_num_data_items(struct ceph_osd_request *req,
/* reply */
case CEPH_OSD_OP_STAT:
case CEPH_OSD_OP_READ:
+ case CEPH_OSD_OP_SPARSE_READ:
case CEPH_OSD_OP_LIST_WATCHERS:
*num_reply_data_items += 1;
break;
@@ -738,7 +766,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
- opcode != CEPH_OSD_OP_TRUNCATE);
+ opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_SPARSE_READ);
op->extent.offset = offset;
op->extent.length = length;
@@ -951,6 +979,8 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
#endif
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
+ } else if (osd_data->type == CEPH_OSD_DATA_TYPE_ITER) {
+ ceph_msg_data_add_iter(msg, &osd_data->iter);
} else {
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
}
@@ -963,6 +993,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
case CEPH_OSD_OP_STAT:
break;
case CEPH_OSD_OP_READ:
+ case CEPH_OSD_OP_SPARSE_READ:
case CEPH_OSD_OP_WRITE:
case CEPH_OSD_OP_WRITEFULL:
case CEPH_OSD_OP_ZERO:
@@ -1017,6 +1048,10 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
dst->copy_from.src_fadvise_flags =
cpu_to_le32(src->copy_from.src_fadvise_flags);
break;
+ case CEPH_OSD_OP_ASSERT_VER:
+ dst->assert_ver.unused = cpu_to_le64(0);
+ dst->assert_ver.ver = cpu_to_le64(src->assert_ver.ver);
+ break;
default:
pr_err("unsupported osd opcode %s\n",
ceph_osd_op_name(src->op));
@@ -1059,7 +1094,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
- opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
+ opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE &&
+ opcode != CEPH_OSD_OP_SPARSE_READ);
req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
GFP_NOFS);
@@ -1100,15 +1136,30 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
if (flags & CEPH_OSD_FLAG_WRITE)
req->r_data_offset = off;
- if (num_ops > 1)
+ if (num_ops > 1) {
+ int num_req_ops, num_rep_ops;
+
/*
- * This is a special case for ceph_writepages_start(), but it
- * also covers ceph_uninline_data(). If more multi-op request
- * use cases emerge, we will need a separate helper.
+ * If this is a multi-op write request, assume that we'll need
+ * request ops. If it's a multi-op read then assume we'll need
+ * reply ops. Anything else and call it -EINVAL.
*/
- r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_ops, 0);
- else
+ if (flags & CEPH_OSD_FLAG_WRITE) {
+ num_req_ops = num_ops;
+ num_rep_ops = 0;
+ } else if (flags & CEPH_OSD_FLAG_READ) {
+ num_req_ops = 0;
+ num_rep_ops = num_ops;
+ } else {
+ r = -EINVAL;
+ goto fail;
+ }
+
+ r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_req_ops,
+ num_rep_ops);
+ } else {
r = ceph_osdc_alloc_messages(req, GFP_NOFS);
+ }
if (r)
goto fail;
@@ -1120,6 +1171,18 @@ fail:
}
EXPORT_SYMBOL(ceph_osdc_new_request);
+int __ceph_alloc_sparse_ext_map(struct ceph_osd_req_op *op, int cnt)
+{
+ op->extent.sparse_ext_cnt = cnt;
+ op->extent.sparse_ext = kmalloc_array(cnt,
+ sizeof(*op->extent.sparse_ext),
+ GFP_NOFS);
+ if (!op->extent.sparse_ext)
+ return -ENOMEM;
+ return 0;
+}
+EXPORT_SYMBOL(__ceph_alloc_sparse_ext_map);
+
/*
* We keep osd requests in an rbtree, sorted by ->r_tid.
*/
@@ -1177,6 +1240,7 @@ static void osd_init(struct ceph_osd *osd)
{
refcount_set(&osd->o_ref, 1);
RB_CLEAR_NODE(&osd->o_node);
+ spin_lock_init(&osd->o_requests_lock);
osd->o_requests = RB_ROOT;
osd->o_linger_requests = RB_ROOT;
osd->o_backoff_mappings = RB_ROOT;
@@ -1187,6 +1251,13 @@ static void osd_init(struct ceph_osd *osd)
mutex_init(&osd->lock);
}
+static void ceph_init_sparse_read(struct ceph_sparse_read *sr)
+{
+ kfree(sr->sr_extent);
+ memset(sr, '\0', sizeof(*sr));
+ sr->sr_state = CEPH_SPARSE_READ_HDR;
+}
+
static void osd_cleanup(struct ceph_osd *osd)
{
WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
@@ -1197,6 +1268,8 @@ static void osd_cleanup(struct ceph_osd *osd)
WARN_ON(!list_empty(&osd->o_osd_lru));
WARN_ON(!list_empty(&osd->o_keepalive_item));
+ ceph_init_sparse_read(&osd->o_sparse_read);
+
if (osd->o_auth.authorizer) {
WARN_ON(osd_homeless(osd));
ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
@@ -1216,6 +1289,9 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
osd_init(osd);
osd->o_osdc = osdc;
osd->o_osd = onum;
+ osd->o_sparse_op_idx = -1;
+
+ ceph_init_sparse_read(&osd->o_sparse_read);
ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
@@ -1406,7 +1482,9 @@ static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
atomic_inc(&osd->o_osdc->num_homeless);
get_osd(osd);
+ spin_lock(&osd->o_requests_lock);
insert_request(&osd->o_requests, req);
+ spin_unlock(&osd->o_requests_lock);
req->r_osd = osd;
}
@@ -1418,7 +1496,9 @@ static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
req, req->r_tid);
req->r_osd = NULL;
+ spin_lock(&osd->o_requests_lock);
erase_request(&osd->o_requests, req);
+ spin_unlock(&osd->o_requests_lock);
put_osd(osd);
if (!osd_homeless(osd))
@@ -2016,6 +2096,7 @@ static void setup_request_data(struct ceph_osd_request *req)
&op->raw_data_in);
break;
case CEPH_OSD_OP_READ:
+ case CEPH_OSD_OP_SPARSE_READ:
ceph_osdc_msg_data_add(reply_msg,
&op->extent.osd_data);
break;
@@ -2435,8 +2516,10 @@ static void finish_request(struct ceph_osd_request *req)
req->r_end_latency = ktime_get();
- if (req->r_osd)
+ if (req->r_osd) {
+ ceph_init_sparse_read(&req->r_osd->o_sparse_read);
unlink_request(req->r_osd, req);
+ }
atomic_dec(&osdc->num_requests);
/*
@@ -3795,6 +3878,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
* one (type of) reply back.
*/
WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK));
+ req->r_version = m.user_version;
req->r_result = m.result ?: data_len;
finish_request(req);
mutex_unlock(&osd->lock);
@@ -5348,6 +5432,24 @@ static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
ceph_msg_put(msg);
}
+/* How much sparse data was requested? */
+static u64 sparse_data_requested(struct ceph_osd_request *req)
+{
+ u64 len = 0;
+
+ if (req->r_flags & CEPH_OSD_FLAG_READ) {
+ int i;
+
+ for (i = 0; i < req->r_num_ops; ++i) {
+ struct ceph_osd_req_op *op = &req->r_ops[i];
+
+ if (op->op == CEPH_OSD_OP_SPARSE_READ)
+ len += op->extent.length;
+ }
+ }
+ return len;
+}
+
/*
* Lookup and return message for incoming reply. Don't try to do
* anything about a larger than preallocated data portion of the
@@ -5364,6 +5466,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
int front_len = le32_to_cpu(hdr->front_len);
int data_len = le32_to_cpu(hdr->data_len);
u64 tid = le64_to_cpu(hdr->tid);
+ u64 srlen;
down_read(&osdc->lock);
if (!osd_registered(osd)) {
@@ -5396,7 +5499,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
req->r_reply = m;
}
- if (data_len > req->r_reply->data_length) {
+ srlen = sparse_data_requested(req);
+ if (!srlen && data_len > req->r_reply->data_length) {
pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
__func__, osd->o_osd, req->r_tid, data_len,
req->r_reply->data_length);
@@ -5406,6 +5510,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
}
m = ceph_msg_get(req->r_reply);
+ m->sparse_read = (bool)srlen;
+
dout("get_reply tid %lld %p\n", tid, m);
out_unlock_session:
@@ -5638,9 +5744,217 @@ static int osd_check_message_signature(struct ceph_msg *msg)
return ceph_auth_check_message_signature(auth, msg);
}
+static void advance_cursor(struct ceph_msg_data_cursor *cursor, size_t len,
+ bool zero)
+{
+ while (len) {
+ struct page *page;
+ size_t poff, plen;
+
+ page = ceph_msg_data_next(cursor, &poff, &plen);
+ if (plen > len)
+ plen = len;
+ if (zero)
+ zero_user_segment(page, poff, poff + plen);
+ len -= plen;
+ ceph_msg_data_advance(cursor, plen);
+ }
+}
+
+static int prep_next_sparse_read(struct ceph_connection *con,
+ struct ceph_msg_data_cursor *cursor)
+{
+ struct ceph_osd *o = con->private;
+ struct ceph_sparse_read *sr = &o->o_sparse_read;
+ struct ceph_osd_request *req;
+ struct ceph_osd_req_op *op;
+
+ spin_lock(&o->o_requests_lock);
+ req = lookup_request(&o->o_requests, le64_to_cpu(con->in_msg->hdr.tid));
+ if (!req) {
+ spin_unlock(&o->o_requests_lock);
+ return -EBADR;
+ }
+
+ if (o->o_sparse_op_idx < 0) {
+ u64 srlen = sparse_data_requested(req);
+
+ dout("%s: [%d] starting new sparse read req. srlen=0x%llx\n",
+ __func__, o->o_osd, srlen);
+ ceph_msg_data_cursor_init(cursor, con->in_msg, srlen);
+ } else {
+ u64 end;
+
+ op = &req->r_ops[o->o_sparse_op_idx];
+
+ WARN_ON_ONCE(op->extent.sparse_ext);
+
+ /* hand back buffer we took earlier */
+ op->extent.sparse_ext = sr->sr_extent;
+ sr->sr_extent = NULL;
+ op->extent.sparse_ext_cnt = sr->sr_count;
+ sr->sr_ext_len = 0;
+ dout("%s: [%d] completed extent array len %d cursor->resid %zd\n",
+ __func__, o->o_osd, op->extent.sparse_ext_cnt, cursor->resid);
+ /* Advance to end of data for this operation */
+ end = ceph_sparse_ext_map_end(op);
+ if (end < sr->sr_req_len)
+ advance_cursor(cursor, sr->sr_req_len - end, false);
+ }
+
+ ceph_init_sparse_read(sr);
+
+ /* find next op in this request (if any) */
+ while (++o->o_sparse_op_idx < req->r_num_ops) {
+ op = &req->r_ops[o->o_sparse_op_idx];
+ if (op->op == CEPH_OSD_OP_SPARSE_READ)
+ goto found;
+ }
+
+ /* reset for next sparse read request */
+ spin_unlock(&o->o_requests_lock);
+ o->o_sparse_op_idx = -1;
+ return 0;
+found:
+ sr->sr_req_off = op->extent.offset;
+ sr->sr_req_len = op->extent.length;
+ sr->sr_pos = sr->sr_req_off;
+ dout("%s: [%d] new sparse read op at idx %d 0x%llx~0x%llx\n", __func__,
+ o->o_osd, o->o_sparse_op_idx, sr->sr_req_off, sr->sr_req_len);
+
+ /* hand off request's sparse extent map buffer */
+ sr->sr_ext_len = op->extent.sparse_ext_cnt;
+ op->extent.sparse_ext_cnt = 0;
+ sr->sr_extent = op->extent.sparse_ext;
+ op->extent.sparse_ext = NULL;
+
+ spin_unlock(&o->o_requests_lock);
+ return 1;
+}
+
+#ifdef __BIG_ENDIAN
+static inline void convert_extent_map(struct ceph_sparse_read *sr)
+{
+ int i;
+
+ for (i = 0; i < sr->sr_count; i++) {
+ struct ceph_sparse_extent *ext = &sr->sr_extent[i];
+
+ ext->off = le64_to_cpu((__force __le64)ext->off);
+ ext->len = le64_to_cpu((__force __le64)ext->len);
+ }
+}
+#else
+static inline void convert_extent_map(struct ceph_sparse_read *sr)
+{
+}
+#endif
+
+#define MAX_EXTENTS 4096
+
+static int osd_sparse_read(struct ceph_connection *con,
+ struct ceph_msg_data_cursor *cursor,
+ char **pbuf)
+{
+ struct ceph_osd *o = con->private;
+ struct ceph_sparse_read *sr = &o->o_sparse_read;
+ u32 count = sr->sr_count;
+ u64 eoff, elen;
+ int ret;
+
+ switch (sr->sr_state) {
+ case CEPH_SPARSE_READ_HDR:
+next_op:
+ ret = prep_next_sparse_read(con, cursor);
+ if (ret <= 0)
+ return ret;
+
+ /* number of extents */
+ ret = sizeof(sr->sr_count);
+ *pbuf = (char *)&sr->sr_count;
+ sr->sr_state = CEPH_SPARSE_READ_EXTENTS;
+ break;
+ case CEPH_SPARSE_READ_EXTENTS:
+ /* Convert sr_count to host-endian */
+ count = le32_to_cpu((__force __le32)sr->sr_count);
+ sr->sr_count = count;
+ dout("[%d] got %u extents\n", o->o_osd, count);
+
+ if (count > 0) {
+ if (!sr->sr_extent || count > sr->sr_ext_len) {
+ /*
+ * Apply a hard cap to the number of extents.
+ * If we have more, assume something is wrong.
+ */
+ if (count > MAX_EXTENTS) {
+ dout("%s: OSD returned 0x%x extents in a single reply!\n",
+ __func__, count);
+ return -EREMOTEIO;
+ }
+
+ /* no extent array provided, or too short */
+ kfree(sr->sr_extent);
+ sr->sr_extent = kmalloc_array(count,
+ sizeof(*sr->sr_extent),
+ GFP_NOIO);
+ if (!sr->sr_extent)
+ return -ENOMEM;
+ sr->sr_ext_len = count;
+ }
+ ret = count * sizeof(*sr->sr_extent);
+ *pbuf = (char *)sr->sr_extent;
+ sr->sr_state = CEPH_SPARSE_READ_DATA_LEN;
+ break;
+ }
+ /* No extents? Read data len */
+ fallthrough;
+ case CEPH_SPARSE_READ_DATA_LEN:
+ convert_extent_map(sr);
+ ret = sizeof(sr->sr_datalen);
+ *pbuf = (char *)&sr->sr_datalen;
+ sr->sr_state = CEPH_SPARSE_READ_DATA;
+ break;
+ case CEPH_SPARSE_READ_DATA:
+ if (sr->sr_index >= count) {
+ sr->sr_state = CEPH_SPARSE_READ_HDR;
+ goto next_op;
+ }
+
+ eoff = sr->sr_extent[sr->sr_index].off;
+ elen = sr->sr_extent[sr->sr_index].len;
+
+ dout("[%d] ext %d off 0x%llx len 0x%llx\n",
+ o->o_osd, sr->sr_index, eoff, elen);
+
+ if (elen > INT_MAX) {
+ dout("Sparse read extent length too long (0x%llx)\n",
+ elen);
+ return -EREMOTEIO;
+ }
+
+ /* zero out anything from sr_pos to start of extent */
+ if (sr->sr_pos < eoff)
+ advance_cursor(cursor, eoff - sr->sr_pos, true);
+
+ /* Set position to end of extent */
+ sr->sr_pos = eoff + elen;
+
+ /* send back the new length and nullify the ptr */
+ cursor->sr_resid = elen;
+ ret = elen;
+ *pbuf = NULL;
+
+ /* Bump the array index */
+ ++sr->sr_index;
+ break;
+ }
+ return ret;
+}
+
static const struct ceph_connection_operations osd_con_ops = {
.get = osd_get_con,
.put = osd_put_con,
+ .sparse_read = osd_sparse_read,
.alloc_msg = osd_alloc_msg,
.dispatch = osd_dispatch,
.fault = osd_fault,