diff options
Diffstat (limited to 'fs/ceph/inode.c')
-rw-r--r-- | fs/ceph/inode.c | 245 |
1 files changed, 141 insertions, 104 deletions
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index c2feb310ac1e..761451f36e2d 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -33,9 +33,7 @@ static const struct inode_operations ceph_symlink_iops; -static void ceph_invalidate_work(struct work_struct *work); -static void ceph_writeback_work(struct work_struct *work); -static void ceph_vmtruncate_work(struct work_struct *work); +static void ceph_inode_work(struct work_struct *work); /* * find or create an inode, given the ceph ino number @@ -509,19 +507,16 @@ struct inode *ceph_alloc_inode(struct super_block *sb) INIT_LIST_HEAD(&ci->i_snap_realm_item); INIT_LIST_HEAD(&ci->i_snap_flush_item); - INIT_WORK(&ci->i_wb_work, ceph_writeback_work); - INIT_WORK(&ci->i_pg_inv_work, ceph_invalidate_work); - - INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work); + INIT_WORK(&ci->i_work, ceph_inode_work); + ci->i_work_mask = 0; ceph_fscache_inode_init(ci); return &ci->vfs_inode; } -static void ceph_i_callback(struct rcu_head *head) +void ceph_free_inode(struct inode *inode) { - struct inode *inode = container_of(head, struct inode, i_rcu); struct ceph_inode_info *ci = ceph_inode(inode); kfree(ci->i_symlink); @@ -581,8 +576,6 @@ void ceph_destroy_inode(struct inode *inode) ceph_buffer_put(ci->i_xattrs.prealloc_blob); ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns)); - - call_rcu(&inode->i_rcu, ceph_i_callback); } int ceph_drop_inode(struct inode *inode) @@ -1483,7 +1476,8 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req, pr_err("fill_inode badness on %p got %d\n", in, rc); err = rc; } - iput(in); + /* avoid calling iput_final() in mds dispatch threads */ + ceph_async_iput(in); } return err; @@ -1681,8 +1675,11 @@ retry_lookup: &req->r_caps_reservation); if (ret < 0) { pr_err("fill_inode badness on %p\n", in); - if (d_really_is_negative(dn)) - iput(in); + if (d_really_is_negative(dn)) { + /* avoid calling iput_final() in mds + * dispatch threads */ + ceph_async_iput(in); + } d_drop(dn); err = ret; goto next_item; @@ -1692,7 +1689,7 @@ retry_lookup: if (ceph_security_xattr_deadlock(in)) { dout(" skip splicing dn %p to inode %p" " (security xattr deadlock)\n", dn, in); - iput(in); + ceph_async_iput(in); skipped++; goto next_item; } @@ -1744,56 +1741,86 @@ bool ceph_inode_set_size(struct inode *inode, loff_t size) } /* + * Put reference to inode, but avoid calling iput_final() in current thread. + * iput_final() may wait for reahahead pages. The wait can cause deadlock in + * some contexts. + */ +void ceph_async_iput(struct inode *inode) +{ + if (!inode) + return; + for (;;) { + if (atomic_add_unless(&inode->i_count, -1, 1)) + break; + if (queue_work(ceph_inode_to_client(inode)->inode_wq, + &ceph_inode(inode)->i_work)) + break; + /* queue work failed, i_count must be at least 2 */ + } +} + +/* * Write back inode data in a worker thread. (This can't be done * in the message handler context.) */ void ceph_queue_writeback(struct inode *inode) { + struct ceph_inode_info *ci = ceph_inode(inode); + set_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask); + ihold(inode); - if (queue_work(ceph_inode_to_client(inode)->wb_wq, - &ceph_inode(inode)->i_wb_work)) { + if (queue_work(ceph_inode_to_client(inode)->inode_wq, + &ci->i_work)) { dout("ceph_queue_writeback %p\n", inode); } else { - dout("ceph_queue_writeback %p failed\n", inode); + dout("ceph_queue_writeback %p already queued, mask=%lx\n", + inode, ci->i_work_mask); iput(inode); } } -static void ceph_writeback_work(struct work_struct *work) -{ - struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info, - i_wb_work); - struct inode *inode = &ci->vfs_inode; - - dout("writeback %p\n", inode); - filemap_fdatawrite(&inode->i_data); - iput(inode); -} - /* * queue an async invalidation */ void ceph_queue_invalidate(struct inode *inode) { + struct ceph_inode_info *ci = ceph_inode(inode); + set_bit(CEPH_I_WORK_INVALIDATE_PAGES, &ci->i_work_mask); + ihold(inode); - if (queue_work(ceph_inode_to_client(inode)->pg_inv_wq, - &ceph_inode(inode)->i_pg_inv_work)) { + if (queue_work(ceph_inode_to_client(inode)->inode_wq, + &ceph_inode(inode)->i_work)) { dout("ceph_queue_invalidate %p\n", inode); } else { - dout("ceph_queue_invalidate %p failed\n", inode); + dout("ceph_queue_invalidate %p already queued, mask=%lx\n", + inode, ci->i_work_mask); iput(inode); } } /* - * Invalidate inode pages in a worker thread. (This can't be done - * in the message handler context.) + * Queue an async vmtruncate. If we fail to queue work, we will handle + * the truncation the next time we call __ceph_do_pending_vmtruncate. */ -static void ceph_invalidate_work(struct work_struct *work) +void ceph_queue_vmtruncate(struct inode *inode) { - struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info, - i_pg_inv_work); - struct inode *inode = &ci->vfs_inode; + struct ceph_inode_info *ci = ceph_inode(inode); + set_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask); + + ihold(inode); + if (queue_work(ceph_inode_to_client(inode)->inode_wq, + &ci->i_work)) { + dout("ceph_queue_vmtruncate %p\n", inode); + } else { + dout("ceph_queue_vmtruncate %p already queued, mask=%lx\n", + inode, ci->i_work_mask); + iput(inode); + } +} + +static void ceph_do_invalidate_pages(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); u32 orig_gen; int check = 0; @@ -1845,44 +1872,6 @@ static void ceph_invalidate_work(struct work_struct *work) out: if (check) ceph_check_caps(ci, 0, NULL); - iput(inode); -} - - -/* - * called by trunc_wq; - * - * We also truncate in a separate thread as well. - */ -static void ceph_vmtruncate_work(struct work_struct *work) -{ - struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info, - i_vmtruncate_work); - struct inode *inode = &ci->vfs_inode; - - dout("vmtruncate_work %p\n", inode); - __ceph_do_pending_vmtruncate(inode); - iput(inode); -} - -/* - * Queue an async vmtruncate. If we fail to queue work, we will handle - * the truncation the next time we call __ceph_do_pending_vmtruncate. - */ -void ceph_queue_vmtruncate(struct inode *inode) -{ - struct ceph_inode_info *ci = ceph_inode(inode); - - ihold(inode); - - if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq, - &ci->i_vmtruncate_work)) { - dout("ceph_queue_vmtruncate %p\n", inode); - } else { - dout("ceph_queue_vmtruncate %p failed, pending=%d\n", - inode, ci->i_truncate_pending); - iput(inode); - } } /* @@ -1946,6 +1935,25 @@ retry: wake_up_all(&ci->i_cap_wq); } +static void ceph_inode_work(struct work_struct *work) +{ + struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info, + i_work); + struct inode *inode = &ci->vfs_inode; + + if (test_and_clear_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask)) { + dout("writeback %p\n", inode); + filemap_fdatawrite(&inode->i_data); + } + if (test_and_clear_bit(CEPH_I_WORK_INVALIDATE_PAGES, &ci->i_work_mask)) + ceph_do_invalidate_pages(inode); + + if (test_and_clear_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask)) + __ceph_do_pending_vmtruncate(inode); + + iput(inode); +} + /* * symlinks */ @@ -2269,43 +2277,72 @@ int ceph_permission(struct inode *inode, int mask) return err; } +/* Craft a mask of needed caps given a set of requested statx attrs. */ +static int statx_to_caps(u32 want) +{ + int mask = 0; + + if (want & (STATX_MODE|STATX_UID|STATX_GID|STATX_CTIME)) + mask |= CEPH_CAP_AUTH_SHARED; + + if (want & (STATX_NLINK|STATX_CTIME)) + mask |= CEPH_CAP_LINK_SHARED; + + if (want & (STATX_ATIME|STATX_MTIME|STATX_CTIME|STATX_SIZE| + STATX_BLOCKS)) + mask |= CEPH_CAP_FILE_SHARED; + + if (want & (STATX_CTIME)) + mask |= CEPH_CAP_XATTR_SHARED; + + return mask; +} + /* - * Get all attributes. Hopefully somedata we'll have a statlite() - * and can limit the fields we require to be accurate. + * Get all the attributes. If we have sufficient caps for the requested attrs, + * then we can avoid talking to the MDS at all. */ int ceph_getattr(const struct path *path, struct kstat *stat, u32 request_mask, unsigned int flags) { struct inode *inode = d_inode(path->dentry); struct ceph_inode_info *ci = ceph_inode(inode); - int err; + int err = 0; - err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL, false); - if (!err) { - generic_fillattr(inode, stat); - stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino); - if (ceph_snap(inode) == CEPH_NOSNAP) - stat->dev = inode->i_sb->s_dev; + /* Skip the getattr altogether if we're asked not to sync */ + if (!(flags & AT_STATX_DONT_SYNC)) { + err = ceph_do_getattr(inode, statx_to_caps(request_mask), + flags & AT_STATX_FORCE_SYNC); + if (err) + return err; + } + + generic_fillattr(inode, stat); + stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino); + if (ceph_snap(inode) == CEPH_NOSNAP) + stat->dev = inode->i_sb->s_dev; + else + stat->dev = ci->i_snapid_map ? ci->i_snapid_map->dev : 0; + + if (S_ISDIR(inode->i_mode)) { + if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), + RBYTES)) + stat->size = ci->i_rbytes; else - stat->dev = ci->i_snapid_map ? ci->i_snapid_map->dev : 0; - - if (S_ISDIR(inode->i_mode)) { - if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), - RBYTES)) - stat->size = ci->i_rbytes; - else - stat->size = ci->i_files + ci->i_subdirs; - stat->blocks = 0; - stat->blksize = 65536; - /* - * Some applications rely on the number of st_nlink - * value on directories to be either 0 (if unlinked) - * or 2 + number of subdirectories. - */ - if (stat->nlink == 1) - /* '.' + '..' + subdirs */ - stat->nlink = 1 + 1 + ci->i_subdirs; - } + stat->size = ci->i_files + ci->i_subdirs; + stat->blocks = 0; + stat->blksize = 65536; + /* + * Some applications rely on the number of st_nlink + * value on directories to be either 0 (if unlinked) + * or 2 + number of subdirectories. + */ + if (stat->nlink == 1) + /* '.' + '..' + subdirs */ + stat->nlink = 1 + 1 + ci->i_subdirs; } + + /* Mask off any higher bits (e.g. btime) until we have support */ + stat->result_mask = request_mask & STATX_BASIC_STATS; return err; } |