summaryrefslogtreecommitdiff
path: root/fs/ceph/mds_client.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2023-09-06 22:10:15 +0300
committerLinus Torvalds <torvalds@linux-foundation.org>2023-09-06 22:10:15 +0300
commit7ba2090ca64ea1aa435744884124387db1fac70f (patch)
treeed4ea24f4cfed5f28b9c8cdf99dbdf7df6a221ae /fs/ceph/mds_client.c
parent744a759492b5c57ff24a6e8aabe47b17ad8ee964 (diff)
parentce0d5bd3a6c176f9a3bf867624a07119dd4d0878 (diff)
downloadlinux-7ba2090ca64ea1aa435744884124387db1fac70f.tar.xz
Merge tag 'ceph-for-6.6-rc1' of https://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov: "Mixed with some fixes and cleanups, this brings in reasonably complete fscrypt support to CephFS! The list of things which don't work with encryption should be fairly short, mostly around the edges: fallocate (not supported well in CephFS to begin with), copy_file_range (requires re-encryption), non-default striping patterns. This was a multi-year effort principally by Jeff Layton with assistance from Xiubo Li, Luís Henriques and others, including several dependant changes in the MDS, netfs helper library and fscrypt framework itself" * tag 'ceph-for-6.6-rc1' of https://github.com/ceph/ceph-client: (53 commits) ceph: make num_fwd and num_retry to __u32 ceph: make members in struct ceph_mds_request_args_ext a union rbd: use list_for_each_entry() helper libceph: do not include crypto/algapi.h ceph: switch ceph_lookup/atomic_open() to use new fscrypt helper ceph: fix updating i_truncate_pagecache_size for fscrypt ceph: wait for OSD requests' callbacks to finish when unmounting ceph: drop messages from MDS when unmounting ceph: update documentation regarding snapshot naming limitations ceph: prevent snapshot creation in encrypted locked directories ceph: add support for encrypted snapshot names ceph: invalidate pages when doing direct/sync writes ceph: plumb in decryption during reads ceph: add encryption support to writepage and writepages ceph: add read/modify/write to ceph_sync_write ceph: align data in pages in ceph_sync_write ceph: don't use special DIO path for encrypted inodes ceph: add truncate size handling support for fscrypt ceph: add object version support for sync read libceph: allow ceph_osdc_new_request to accept a multi-op read ...
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r--fs/ceph/mds_client.c676
1 files changed, 520 insertions, 156 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 5fb367b1d4b0..615db141b6c4 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -15,6 +15,7 @@
#include "super.h"
#include "mds_client.h"
+#include "crypto.h"
#include <linux/ceph/ceph_features.h>
#include <linux/ceph/messenger.h>
@@ -184,8 +185,54 @@ static int parse_reply_info_in(void **p, void *end,
info->rsnaps = 0;
}
+ if (struct_v >= 5) {
+ u32 alen;
+
+ ceph_decode_32_safe(p, end, alen, bad);
+
+ while (alen--) {
+ u32 len;
+
+ /* key */
+ ceph_decode_32_safe(p, end, len, bad);
+ ceph_decode_skip_n(p, end, len, bad);
+ /* value */
+ ceph_decode_32_safe(p, end, len, bad);
+ ceph_decode_skip_n(p, end, len, bad);
+ }
+ }
+
+ /* fscrypt flag -- ignore */
+ if (struct_v >= 6)
+ ceph_decode_skip_8(p, end, bad);
+
+ info->fscrypt_auth = NULL;
+ info->fscrypt_auth_len = 0;
+ info->fscrypt_file = NULL;
+ info->fscrypt_file_len = 0;
+ if (struct_v >= 7) {
+ ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad);
+ if (info->fscrypt_auth_len) {
+ info->fscrypt_auth = kmalloc(info->fscrypt_auth_len,
+ GFP_KERNEL);
+ if (!info->fscrypt_auth)
+ return -ENOMEM;
+ ceph_decode_copy_safe(p, end, info->fscrypt_auth,
+ info->fscrypt_auth_len, bad);
+ }
+ ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad);
+ if (info->fscrypt_file_len) {
+ info->fscrypt_file = kmalloc(info->fscrypt_file_len,
+ GFP_KERNEL);
+ if (!info->fscrypt_file)
+ return -ENOMEM;
+ ceph_decode_copy_safe(p, end, info->fscrypt_file,
+ info->fscrypt_file_len, bad);
+ }
+ }
*p = end;
} else {
+ /* legacy (unversioned) struct */
if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
ceph_decode_64_safe(p, end, info->inline_version, bad);
ceph_decode_32_safe(p, end, info->inline_len, bad);
@@ -263,27 +310,47 @@ bad:
static int parse_reply_info_lease(void **p, void *end,
struct ceph_mds_reply_lease **lease,
- u64 features)
+ u64 features, u32 *altname_len, u8 **altname)
{
+ u8 struct_v;
+ u32 struct_len;
+ void *lend;
+
if (features == (u64)-1) {
- u8 struct_v, struct_compat;
- u32 struct_len;
+ u8 struct_compat;
+
ceph_decode_8_safe(p, end, struct_v, bad);
ceph_decode_8_safe(p, end, struct_compat, bad);
+
/* struct_v is expected to be >= 1. we only understand
* encoding whose struct_compat == 1. */
if (!struct_v || struct_compat != 1)
goto bad;
+
ceph_decode_32_safe(p, end, struct_len, bad);
- ceph_decode_need(p, end, struct_len, bad);
- end = *p + struct_len;
+ } else {
+ struct_len = sizeof(**lease);
+ *altname_len = 0;
+ *altname = NULL;
}
- ceph_decode_need(p, end, sizeof(**lease), bad);
+ lend = *p + struct_len;
+ ceph_decode_need(p, end, struct_len, bad);
*lease = *p;
*p += sizeof(**lease);
- if (features == (u64)-1)
- *p = end;
+
+ if (features == (u64)-1) {
+ if (struct_v >= 2) {
+ ceph_decode_32_safe(p, end, *altname_len, bad);
+ ceph_decode_need(p, end, *altname_len, bad);
+ *altname = *p;
+ *p += *altname_len;
+ } else {
+ *altname = NULL;
+ *altname_len = 0;
+ }
+ }
+ *p = lend;
return 0;
bad:
return -EIO;
@@ -313,7 +380,8 @@ static int parse_reply_info_trace(void **p, void *end,
info->dname = *p;
*p += info->dname_len;
- err = parse_reply_info_lease(p, end, &info->dlease, features);
+ err = parse_reply_info_lease(p, end, &info->dlease, features,
+ &info->altname_len, &info->altname);
if (err < 0)
goto out_bad;
}
@@ -339,9 +407,10 @@ out_bad:
* parse readdir results
*/
static int parse_reply_info_readdir(void **p, void *end,
- struct ceph_mds_reply_info_parsed *info,
- u64 features)
+ struct ceph_mds_request *req,
+ u64 features)
{
+ struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
u32 num, i = 0;
int err;
@@ -371,18 +440,87 @@ static int parse_reply_info_readdir(void **p, void *end,
info->dir_nr = num;
while (num) {
+ struct inode *inode = d_inode(req->r_dentry);
+ struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
+ struct fscrypt_str tname = FSTR_INIT(NULL, 0);
+ struct fscrypt_str oname = FSTR_INIT(NULL, 0);
+ struct ceph_fname fname;
+ u32 altname_len, _name_len;
+ u8 *altname, *_name;
+
/* dentry */
- ceph_decode_32_safe(p, end, rde->name_len, bad);
- ceph_decode_need(p, end, rde->name_len, bad);
- rde->name = *p;
- *p += rde->name_len;
- dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
+ ceph_decode_32_safe(p, end, _name_len, bad);
+ ceph_decode_need(p, end, _name_len, bad);
+ _name = *p;
+ *p += _name_len;
+ dout("parsed dir dname '%.*s'\n", _name_len, _name);
+
+ if (info->hash_order)
+ rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
+ _name, _name_len);
/* dentry lease */
- err = parse_reply_info_lease(p, end, &rde->lease, features);
+ err = parse_reply_info_lease(p, end, &rde->lease, features,
+ &altname_len, &altname);
if (err)
goto out_bad;
+
+ /*
+ * Try to dencrypt the dentry names and update them
+ * in the ceph_mds_reply_dir_entry struct.
+ */
+ fname.dir = inode;
+ fname.name = _name;
+ fname.name_len = _name_len;
+ fname.ctext = altname;
+ fname.ctext_len = altname_len;
+ /*
+ * The _name_len maybe larger than altname_len, such as
+ * when the human readable name length is in range of
+ * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE),
+ * then the copy in ceph_fname_to_usr will corrupt the
+ * data if there has no encryption key.
+ *
+ * Just set the no_copy flag and then if there has no
+ * encryption key the oname.name will be assigned to
+ * _name always.
+ */
+ fname.no_copy = true;
+ if (altname_len == 0) {
+ /*
+ * Set tname to _name, and this will be used
+ * to do the base64_decode in-place. It's
+ * safe because the decoded string should
+ * always be shorter, which is 3/4 of origin
+ * string.
+ */
+ tname.name = _name;
+
+ /*
+ * Set oname to _name too, and this will be
+ * used to do the dencryption in-place.
+ */
+ oname.name = _name;
+ oname.len = _name_len;
+ } else {
+ /*
+ * This will do the decryption only in-place
+ * from altname cryptext directly.
+ */
+ oname.name = altname;
+ oname.len = altname_len;
+ }
+ rde->is_nokey = false;
+ err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey);
+ if (err) {
+ pr_err("%s unable to decode %.*s, got %d\n", __func__,
+ _name_len, _name, err);
+ goto out_bad;
+ }
+ rde->name = oname.name;
+ rde->name_len = oname.len;
+
/* inode */
err = parse_reply_info_in(p, end, &rde->inode, features);
if (err < 0)
@@ -581,15 +719,16 @@ bad:
* parse extra results
*/
static int parse_reply_info_extra(void **p, void *end,
- struct ceph_mds_reply_info_parsed *info,
+ struct ceph_mds_request *req,
u64 features, struct ceph_mds_session *s)
{
+ struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
u32 op = le32_to_cpu(info->head->op);
if (op == CEPH_MDS_OP_GETFILELOCK)
return parse_reply_info_filelock(p, end, info, features);
else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
- return parse_reply_info_readdir(p, end, info, features);
+ return parse_reply_info_readdir(p, end, req, features);
else if (op == CEPH_MDS_OP_CREATE)
return parse_reply_info_create(p, end, info, features, s);
else if (op == CEPH_MDS_OP_GETVXATTR)
@@ -602,9 +741,9 @@ static int parse_reply_info_extra(void **p, void *end,
* parse entire mds reply
*/
static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
- struct ceph_mds_reply_info_parsed *info,
- u64 features)
+ struct ceph_mds_request *req, u64 features)
{
+ struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
void *p, *end;
u32 len;
int err;
@@ -626,7 +765,7 @@ static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
ceph_decode_32_safe(&p, end, len, bad);
if (len > 0) {
ceph_decode_need(&p, end, len, bad);
- err = parse_reply_info_extra(&p, p+len, info, features, s);
+ err = parse_reply_info_extra(&p, p+len, req, features, s);
if (err < 0)
goto out_bad;
}
@@ -651,8 +790,21 @@ out_bad:
static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
{
+ int i;
+
+ kfree(info->diri.fscrypt_auth);
+ kfree(info->diri.fscrypt_file);
+ kfree(info->targeti.fscrypt_auth);
+ kfree(info->targeti.fscrypt_file);
if (!info->dir_entries)
return;
+
+ for (i = 0; i < info->dir_nr; i++) {
+ struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
+
+ kfree(rde->inode.fscrypt_auth);
+ kfree(rde->inode.fscrypt_file);
+ }
free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
}
@@ -945,6 +1097,7 @@ void ceph_mdsc_release_request(struct kref *kref)
iput(req->r_parent);
}
iput(req->r_target_inode);
+ iput(req->r_new_inode);
if (req->r_dentry)
dput(req->r_dentry);
if (req->r_old_dentry)
@@ -965,6 +1118,8 @@ void ceph_mdsc_release_request(struct kref *kref)
put_cred(req->r_cred);
if (req->r_pagelist)
ceph_pagelist_release(req->r_pagelist);
+ kfree(req->r_fscrypt_auth);
+ kfree(req->r_altname);
put_request_session(req);
ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
WARN_ON_ONCE(!list_empty(&req->r_wait));
@@ -2373,20 +2528,90 @@ static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
return mdsc->oldest_tid;
}
-/*
- * Build a dentry's path. Allocate on heap; caller must kfree. Based
- * on build_path_from_dentry in fs/cifs/dir.c.
+#if IS_ENABLED(CONFIG_FS_ENCRYPTION)
+static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
+{
+ struct inode *dir = req->r_parent;
+ struct dentry *dentry = req->r_dentry;
+ u8 *cryptbuf = NULL;
+ u32 len = 0;
+ int ret = 0;
+
+ /* only encode if we have parent and dentry */
+ if (!dir || !dentry)
+ goto success;
+
+ /* No-op unless this is encrypted */
+ if (!IS_ENCRYPTED(dir))
+ goto success;
+
+ ret = ceph_fscrypt_prepare_readdir(dir);
+ if (ret < 0)
+ return ERR_PTR(ret);
+
+ /* No key? Just ignore it. */
+ if (!fscrypt_has_encryption_key(dir))
+ goto success;
+
+ if (!fscrypt_fname_encrypted_size(dir, dentry->d_name.len, NAME_MAX,
+ &len)) {
+ WARN_ON_ONCE(1);
+ return ERR_PTR(-ENAMETOOLONG);
+ }
+
+ /* No need to append altname if name is short enough */
+ if (len <= CEPH_NOHASH_NAME_MAX) {
+ len = 0;
+ goto success;
+ }
+
+ cryptbuf = kmalloc(len, GFP_KERNEL);
+ if (!cryptbuf)
+ return ERR_PTR(-ENOMEM);
+
+ ret = fscrypt_fname_encrypt(dir, &dentry->d_name, cryptbuf, len);
+ if (ret) {
+ kfree(cryptbuf);
+ return ERR_PTR(ret);
+ }
+success:
+ *plen = len;
+ return cryptbuf;
+}
+#else
+static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
+{
+ *plen = 0;
+ return NULL;
+}
+#endif
+
+/**
+ * ceph_mdsc_build_path - build a path string to a given dentry
+ * @dentry: dentry to which path should be built
+ * @plen: returned length of string
+ * @pbase: returned base inode number
+ * @for_wire: is this path going to be sent to the MDS?
+ *
+ * Build a string that represents the path to the dentry. This is mostly called
+ * for two different purposes:
+ *
+ * 1) we need to build a path string to send to the MDS (for_wire == true)
+ * 2) we need a path string for local presentation (e.g. debugfs)
+ * (for_wire == false)
*
- * If @stop_on_nosnap, generate path relative to the first non-snapped
- * inode.
+ * The path is built in reverse, starting with the dentry. Walk back up toward
+ * the root, building the path until the first non-snapped inode is reached
+ * (for_wire) or the root inode is reached (!for_wire).
*
* Encode hidden .snap dirs as a double /, i.e.
* foo/.snap/bar -> foo//bar
*/
char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
- int stop_on_nosnap)
+ int for_wire)
{
- struct dentry *temp;
+ struct dentry *cur;
+ struct inode *inode;
char *path;
int pos;
unsigned seq;
@@ -2403,34 +2628,72 @@ retry:
path[pos] = '\0';
seq = read_seqbegin(&rename_lock);
- rcu_read_lock();
- temp = dentry;
+ cur = dget(dentry);
for (;;) {
- struct inode *inode;
+ struct dentry *parent;
- spin_lock(&temp->d_lock);
- inode = d_inode(temp);
+ spin_lock(&cur->d_lock);
+ inode = d_inode(cur);
if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
dout("build_path path+%d: %p SNAPDIR\n",
- pos, temp);
- } else if (stop_on_nosnap && inode && dentry != temp &&
+ pos, cur);
+ spin_unlock(&cur->d_lock);
+ parent = dget_parent(cur);
+ } else if (for_wire && inode && dentry != cur &&
ceph_snap(inode) == CEPH_NOSNAP) {
- spin_unlock(&temp->d_lock);
+ spin_unlock(&cur->d_lock);
pos++; /* get rid of any prepended '/' */
break;
+ } else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) {
+ pos -= cur->d_name.len;
+ if (pos < 0) {
+ spin_unlock(&cur->d_lock);
+ break;
+ }
+ memcpy(path + pos, cur->d_name.name, cur->d_name.len);
+ spin_unlock(&cur->d_lock);
+ parent = dget_parent(cur);
} else {
- pos -= temp->d_name.len;
+ int len, ret;
+ char buf[NAME_MAX];
+
+ /*
+ * Proactively copy name into buf, in case we need to
+ * present it as-is.
+ */
+ memcpy(buf, cur->d_name.name, cur->d_name.len);
+ len = cur->d_name.len;
+ spin_unlock(&cur->d_lock);
+ parent = dget_parent(cur);
+
+ ret = ceph_fscrypt_prepare_readdir(d_inode(parent));
+ if (ret < 0) {
+ dput(parent);
+ dput(cur);
+ return ERR_PTR(ret);
+ }
+
+ if (fscrypt_has_encryption_key(d_inode(parent))) {
+ len = ceph_encode_encrypted_fname(d_inode(parent),
+ cur, buf);
+ if (len < 0) {
+ dput(parent);
+ dput(cur);
+ return ERR_PTR(len);
+ }
+ }
+ pos -= len;
if (pos < 0) {
- spin_unlock(&temp->d_lock);
+ dput(parent);
break;
}
- memcpy(path + pos, temp->d_name.name, temp->d_name.len);
+ memcpy(path + pos, buf, len);
}
- spin_unlock(&temp->d_lock);
- temp = READ_ONCE(temp->d_parent);
+ dput(cur);
+ cur = parent;
/* Are we at the root? */
- if (IS_ROOT(temp))
+ if (IS_ROOT(cur))
break;
/* Are we out of buffer? */
@@ -2439,8 +2702,9 @@ retry:
path[pos] = '/';
}
- base = ceph_ino(d_inode(temp));
- rcu_read_unlock();
+ inode = d_inode(cur);
+ base = inode ? ceph_ino(inode) : 0;
+ dput(cur);
if (read_seqretry(&rename_lock, seq))
goto retry;
@@ -2450,8 +2714,8 @@ retry:
* A rename didn't occur, but somehow we didn't end up where
* we thought we would. Throw a warning and try again.
*/
- pr_warn("build_path did not end path lookup where "
- "expected, pos is %d\n", pos);
+ pr_warn("build_path did not end path lookup where expected (pos = %d)\n",
+ pos);
goto retry;
}
@@ -2471,7 +2735,8 @@ static int build_dentry_path(struct dentry *dentry, struct inode *dir,
rcu_read_lock();
if (!dir)
dir = d_inode_rcu(dentry->d_parent);
- if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
+ if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP &&
+ !IS_ENCRYPTED(dir)) {
*pino = ceph_ino(dir);
rcu_read_unlock();
*ppath = dentry->d_name.name;
@@ -2539,8 +2804,8 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
return r;
}
-static void encode_timestamp_and_gids(void **p,
- const struct ceph_mds_request *req)
+static void encode_mclientrequest_tail(void **p,
+ const struct ceph_mds_request *req)
{
struct ceph_timespec ts;
int i;
@@ -2548,11 +2813,43 @@ static void encode_timestamp_and_gids(void **p,
ceph_encode_timespec64(&ts, &req->r_stamp);
ceph_encode_copy(p, &ts, sizeof(ts));
- /* gid_list */
+ /* v4: gid_list */
ceph_encode_32(p, req->r_cred->group_info->ngroups);
for (i = 0; i < req->r_cred->group_info->ngroups; i++)
ceph_encode_64(p, from_kgid(&init_user_ns,
req->r_cred->group_info->gid[i]));
+
+ /* v5: altname */
+ ceph_encode_32(p, req->r_altname_len);
+ ceph_encode_copy(p, req->r_altname, req->r_altname_len);
+
+ /* v6: fscrypt_auth and fscrypt_file */
+ if (req->r_fscrypt_auth) {
+ u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth);
+
+ ceph_encode_32(p, authlen);
+ ceph_encode_copy(p, req->r_fscrypt_auth, authlen);
+ } else {
+ ceph_encode_32(p, 0);
+ }
+ if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) {
+ ceph_encode_32(p, sizeof(__le64));
+ ceph_encode_64(p, req->r_fscrypt_file);
+ } else {
+ ceph_encode_32(p, 0);
+ }
+}
+
+static struct ceph_mds_request_head_legacy *
+find_legacy_request_head(void *p, u64 features)
+{
+ bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
+ struct ceph_mds_request_head_old *ohead;
+
+ if (legacy)
+ return (struct ceph_mds_request_head_legacy *)p;
+ ohead = (struct ceph_mds_request_head_old *)p;
+ return (struct ceph_mds_request_head_legacy *)&ohead->oldest_client_tid;
}
/*
@@ -2565,7 +2862,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
int mds = session->s_mds;
struct ceph_mds_client *mdsc = session->s_mdsc;
struct ceph_msg *msg;
- struct ceph_mds_request_head_old *head;
+ struct ceph_mds_request_head_legacy *lhead;
const char *path1 = NULL;
const char *path2 = NULL;
u64 ino1 = 0, ino2 = 0;
@@ -2577,6 +2874,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
void *p, *end;
int ret;
bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
+ bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD,
+ &session->s_features);
ret = set_request_path_attr(req->r_inode, req->r_dentry,
req->r_parent, req->r_path1, req->r_ino1.ino,
@@ -2601,12 +2900,32 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
goto out_free1;
}
- len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head);
- len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
- sizeof(struct ceph_timespec);
- len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
+ req->r_altname = get_fscrypt_altname(req, &req->r_altname_len);
+ if (IS_ERR(req->r_altname)) {
+ msg = ERR_CAST(req->r_altname);
+ req->r_altname = NULL;
+ goto out_free2;
+ }
+
+ /*
+ * For old cephs without supporting the 32bit retry/fwd feature
+ * it will copy the raw memories directly when decoding the
+ * requests. While new cephs will decode the head depending the
+ * version member, so we need to make sure it will be compatible
+ * with them both.
+ */
+ if (legacy)
+ len = sizeof(struct ceph_mds_request_head_legacy);
+ else if (old_version)
+ len = sizeof(struct ceph_mds_request_head_old);
+ else
+ len = sizeof(struct ceph_mds_request_head);
- /* calculate (max) length for cap releases */
+ /* filepaths */
+ len += 2 * (1 + sizeof(u32) + sizeof(u64));
+ len += pathlen1 + pathlen2;
+
+ /* cap releases */
len += sizeof(struct ceph_mds_request_release) *
(!!req->r_inode_drop + !!req->r_dentry_drop +
!!req->r_old_inode_drop + !!req->r_old_dentry_drop);
@@ -2616,6 +2935,27 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
if (req->r_old_dentry_drop)
len += pathlen2;
+ /* MClientRequest tail */
+
+ /* req->r_stamp */
+ len += sizeof(struct ceph_timespec);
+
+ /* gid list */
+ len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
+
+ /* alternate name */
+ len += sizeof(u32) + req->r_altname_len;
+
+ /* fscrypt_auth */
+ len += sizeof(u32); // fscrypt_auth
+ if (req->r_fscrypt_auth)
+ len += ceph_fscrypt_auth_len(req->r_fscrypt_auth);
+
+ /* fscrypt_file */
+ len += sizeof(u32);
+ if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags))
+ len += sizeof(__le64);
+
msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
if (!msg) {
msg = ERR_PTR(-ENOMEM);
@@ -2624,33 +2964,40 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
msg->hdr.tid = cpu_to_le64(req->r_tid);
+ lhead = find_legacy_request_head(msg->front.iov_base,
+ session->s_con.peer_features);
+
/*
- * The old ceph_mds_request_head didn't contain a version field, and
+ * The ceph_mds_request_head_legacy didn't contain a version field, and
* one was added when we moved the message version from 3->4.
*/
if (legacy) {
msg->hdr.version = cpu_to_le16(3);
- head = msg->front.iov_base;
- p = msg->front.iov_base + sizeof(*head);
- } else {
- struct ceph_mds_request_head *new_head = msg->front.iov_base;
+ p = msg->front.iov_base + sizeof(*lhead);
+ } else if (old_version) {
+ struct ceph_mds_request_head_old *ohead = msg->front.iov_base;
msg->hdr.version = cpu_to_le16(4);
- new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
- head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
- p = msg->front.iov_base + sizeof(*new_head);
+ ohead->version = cpu_to_le16(1);
+ p = msg->front.iov_base + sizeof(*ohead);
+ } else {
+ struct ceph_mds_request_head *nhead = msg->front.iov_base;
+
+ msg->hdr.version = cpu_to_le16(6);
+ nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
+ p = msg->front.iov_base + sizeof(*nhead);
}
end = msg->front.iov_base + msg->front.iov_len;
- head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
- head->op = cpu_to_le32(req->r_op);
- head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
- req->r_cred->fsuid));
- head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
- req->r_cred->fsgid));
- head->ino = cpu_to_le64(req->r_deleg_ino);
- head->args = req->r_args;
+ lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
+ lhead->op = cpu_to_le32(req->r_op);
+ lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
+ req->r_cred->fsuid));
+ lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
+ req->r_cred->fsgid));
+ lhead->ino = cpu_to_le64(req->r_deleg_ino);
+ lhead->args = req->r_args;
ceph_encode_filepath(&p, end, ino1, path1);
ceph_encode_filepath(&p, end, ino2, path2);
@@ -2665,15 +3012,23 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
req->r_inode ? req->r_inode : d_inode(req->r_dentry),
mds, req->r_inode_drop, req->r_inode_unless,
req->r_op == CEPH_MDS_OP_READDIR);
- if (req->r_dentry_drop)
- releases += ceph_encode_dentry_release(&p, req->r_dentry,
+ if (req->r_dentry_drop) {
+ ret = ceph_encode_dentry_release(&p, req->r_dentry,
req->r_parent, mds, req->r_dentry_drop,
req->r_dentry_unless);
- if (req->r_old_dentry_drop)
- releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
+ if (ret < 0)
+ goto out_err;
+ releases += ret;
+ }
+ if (req->r_old_dentry_drop) {
+ ret = ceph_encode_dentry_release(&p, req->r_old_dentry,
req->r_old_dentry_dir, mds,
req->r_old_dentry_drop,
req->r_old_dentry_unless);
+ if (ret < 0)
+ goto out_err;
+ releases += ret;
+ }
if (req->r_old_inode_drop)
releases += ceph_encode_inode_release(&p,
d_inode(req->r_old_dentry),
@@ -2684,9 +3039,9 @@ static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
p = msg->front.iov_base + req->r_request_release_offset;
}
- head->num_releases = cpu_to_le16(releases);
+ lhead->num_releases = cpu_to_le16(releases);
- encode_timestamp_and_gids(&p, req);
+ encode_mclientrequest_tail(&p, req);
if (WARN_ON_ONCE(p > end)) {
ceph_msg_put(msg);
@@ -2715,6 +3070,10 @@ out_free1:
ceph_mdsc_free_path((char *)path1, pathlen1);
out:
return msg;
+out_err:
+ ceph_msg_put(msg);
+ msg = ERR_PTR(ret);
+ goto out_free2;
}
/*
@@ -2731,18 +3090,6 @@ static void complete_request(struct ceph_mds_client *mdsc,
complete_all(&req->r_completion);
}
-static struct ceph_mds_request_head_old *
-find_old_request_head(void *p, u64 features)
-{
- bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
- struct ceph_mds_request_head *new_head;
-
- if (legacy)
- return (struct ceph_mds_request_head_old *)p;
- new_head = (struct ceph_mds_request_head *)p;
- return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
-}
-
/*
* called under mdsc->mutex
*/
@@ -2752,29 +3099,28 @@ static int __prepare_send_request(struct ceph_mds_session *session,
{
int mds = session->s_mds;
struct ceph_mds_client *mdsc = session->s_mdsc;
- struct ceph_mds_request_head_old *rhead;
+ struct ceph_mds_request_head_legacy *lhead;
+ struct ceph_mds_request_head *nhead;
struct ceph_msg *msg;
- int flags = 0, max_retry;
+ int flags = 0, old_max_retry;
+ bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD,
+ &session->s_features);
/*
- * The type of 'r_attempts' in kernel 'ceph_mds_request'
- * is 'int', while in 'ceph_mds_request_head' the type of
- * 'num_retry' is '__u8'. So in case the request retries
- * exceeding 256 times, the MDS will receive a incorrect
- * retry seq.
- *
- * In this case it's ususally a bug in MDS and continue
- * retrying the request makes no sense.
- *
- * In future this could be fixed in ceph code, so avoid
- * using the hardcode here.
+ * Avoid inifinite retrying after overflow. The client will
+ * increase the retry count and if the MDS is old version,
+ * so we limit to retry at most 256 times.
*/
- max_retry = sizeof_field(struct ceph_mds_request_head, num_retry);
- max_retry = 1 << (max_retry * BITS_PER_BYTE);
- if (req->r_attempts >= max_retry) {
- pr_warn_ratelimited("%s request tid %llu seq overflow\n",
- __func__, req->r_tid);
- return -EMULTIHOP;
+ if (req->r_attempts) {
+ old_max_retry = sizeof_field(struct ceph_mds_request_head_old,
+ num_retry);
+ old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE);
+ if ((old_version && req->r_attempts >= old_max_retry) ||
+ ((uint32_t)req->r_attempts >= U32_MAX)) {
+ pr_warn_ratelimited("%s request tid %llu seq overflow\n",
+ __func__, req->r_tid);
+ return -EMULTIHOP;
+ }
}
req->r_attempts++;
@@ -2800,23 +3146,27 @@ static int __prepare_send_request(struct ceph_mds_session *session,
* d_move mangles the src name.
*/
msg = req->r_request;
- rhead = find_old_request_head(msg->front.iov_base,
- session->s_con.peer_features);
+ lhead = find_legacy_request_head(msg->front.iov_base,
+ session->s_con.peer_features);
- flags = le32_to_cpu(rhead->flags);
+ flags = le32_to_cpu(lhead->flags);
flags |= CEPH_MDS_FLAG_REPLAY;
- rhead->flags = cpu_to_le32(flags);
+ lhead->flags = cpu_to_le32(flags);
if (req->r_target_inode)
- rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
+ lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
- rhead->num_retry = req->r_attempts - 1;
+ lhead->num_retry = req->r_attempts - 1;
+ if (!old_version) {
+ nhead = (struct ceph_mds_request_head*)msg->front.iov_base;
+ nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1);
+ }
/* remove cap/dentry releases from message */
- rhead->num_releases = 0;
+ lhead->num_releases = 0;
p = msg->front.iov_base + req->r_request_release_offset;
- encode_timestamp_and_gids(&p, req);
+ encode_mclientrequest_tail(&p, req);
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
@@ -2834,18 +3184,23 @@ static int __prepare_send_request(struct ceph_mds_session *session,
}
req->r_request = msg;
- rhead = find_old_request_head(msg->front.iov_base,
- session->s_con.peer_features);
- rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
+ lhead = find_legacy_request_head(msg->front.iov_base,
+ session->s_con.peer_features);
+ lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
flags |= CEPH_MDS_FLAG_REPLAY;
if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
flags |= CEPH_MDS_FLAG_ASYNC;
if (req->r_parent)
flags |= CEPH_MDS_FLAG_WANT_DENTRY;
- rhead->flags = cpu_to_le32(flags);
- rhead->num_fwd = req->r_num_fwd;
- rhead->num_retry = req->r_attempts - 1;
+ lhead->flags = cpu_to_le32(flags);
+ lhead->num_fwd = req->r_num_fwd;
+ lhead->num_retry = req->r_attempts - 1;
+ if (!old_version) {
+ nhead = (struct ceph_mds_request_head*)msg->front.iov_base;
+ nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd);
+ nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1);
+ }
dout(" r_parent = %p\n", req->r_parent);
return 0;
@@ -3348,22 +3703,35 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
}
dout("handle_reply tid %lld result %d\n", tid, result);
- rinfo = &req->r_reply_info;
if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
- err = parse_reply_info(session, msg, rinfo, (u64)-1);
+ err = parse_reply_info(session, msg, req, (u64)-1);
else
- err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
+ err = parse_reply_info(session, msg, req,
+ session->s_con.peer_features);
mutex_unlock(&mdsc->mutex);
/* Must find target inode outside of mutexes to avoid deadlocks */
+ rinfo = &req->r_reply_info;
if ((err >= 0) && rinfo->head->is_target) {
- struct inode *in;
+ struct inode *in = xchg(&req->r_new_inode, NULL);
struct ceph_vino tvino = {
.ino = le64_to_cpu(rinfo->targeti.in->ino),
.snap = le64_to_cpu(rinfo->targeti.in->snapid)
};
- in = ceph_get_inode(mdsc->fsc->sb, tvino);
+ /*
+ * If we ended up opening an existing inode, discard
+ * r_new_inode
+ */
+ if (req->r_op == CEPH_MDS_OP_CREATE &&
+ !req->r_reply_info.has_create_ino) {
+ /* This should never happen on an async create */
+ WARN_ON_ONCE(req->r_deleg_ino);
+ iput(in);
+ in = NULL;
+ }
+
+ in = ceph_get_inode(mdsc->fsc->sb, tvino, in);
if (IS_ERR(in)) {
err = PTR_ERR(in);
mutex_lock(&session->s_mutex);
@@ -3406,7 +3774,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
if (err == 0) {
if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
req->r_op == CEPH_MDS_OP_LSSNAP))
- ceph_readdir_prepopulate(req, req->r_session);
+ err = ceph_readdir_prepopulate(req, req->r_session);
}
current->journal_info = NULL;
mutex_unlock(&req->r_fill_mutex);
@@ -3491,33 +3859,21 @@ static void handle_forward(struct ceph_mds_client *mdsc,
if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
dout("forward tid %llu aborted, unregistering\n", tid);
__unregister_request(mdsc, req);
- } else if (fwd_seq <= req->r_num_fwd) {
+ } else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) {
/*
- * The type of 'num_fwd' in ceph 'MClientRequestForward'
- * is 'int32_t', while in 'ceph_mds_request_head' the
- * type is '__u8'. So in case the request bounces between
- * MDSes exceeding 256 times, the client will get stuck.
- *
- * In this case it's ususally a bug in MDS and continue
- * bouncing the request makes no sense.
+ * Avoid inifinite retrying after overflow.
*
- * In future this could be fixed in ceph code, so avoid
- * using the hardcode here.
+ * The MDS will increase the fwd count and in client side
+ * if the num_fwd is less than the one saved in request
+ * that means the MDS is an old version and overflowed of
+ * 8 bits.
*/
- int max = sizeof_field(struct ceph_mds_request_head, num_fwd);
- max = 1 << (max * BITS_PER_BYTE);
- if (req->r_num_fwd >= max) {
- mutex_lock(&req->r_fill_mutex);
- req->r_err = -EMULTIHOP;
- set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
- mutex_unlock(&req->r_fill_mutex);
- aborted = true;
- pr_warn_ratelimited("forward tid %llu seq overflow\n",
- tid);
- } else {
- dout("forward tid %llu to mds%d - old seq %d <= %d\n",
- tid, next_mds, req->r_num_fwd, fwd_seq);
- }
+ mutex_lock(&req->r_fill_mutex);
+ req->r_err = -EMULTIHOP;
+ set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
+ mutex_unlock(&req->r_fill_mutex);
+ aborted = true;
+ pr_warn_ratelimited("forward tid %llu seq overflow\n", tid);
} else {
/* resend. forward race not possible; mds would drop */
dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
@@ -4550,6 +4906,9 @@ static void handle_lease(struct ceph_mds_client *mdsc,
dout("handle_lease from mds%d\n", mds);
+ if (!ceph_inc_mds_stopping_blocker(mdsc, session))
+ return;
+
/* decode */
if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
goto bad;
@@ -4568,8 +4927,6 @@ static void handle_lease(struct ceph_mds_client *mdsc,
dname.len, dname.name);
mutex_lock(&session->s_mutex);
- inc_session_sequence(session);
-
if (!inode) {
dout("handle_lease no inode %llx\n", vino.ino);
goto release;
@@ -4631,9 +4988,13 @@ release:
out:
mutex_unlock(&session->s_mutex);
iput(inode);
+
+ ceph_dec_mds_stopping_blocker(mdsc);
return;
bad:
+ ceph_dec_mds_stopping_blocker(mdsc);
+
pr_err("corrupt lease message\n");
ceph_msg_dump(msg);
}
@@ -4829,6 +5190,9 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
}
init_completion(&mdsc->safe_umount_waiters);
+ spin_lock_init(&mdsc->stopping_lock);
+ atomic_set(&mdsc->stopping_blockers, 0);
+ init_completion(&mdsc->stopping_waiter);
init_waitqueue_head(&mdsc->session_close_wq);
INIT_LIST_HEAD(&mdsc->waiting_for_map);
mdsc->quotarealms_inodes = RB_ROOT;