summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--MAINTAINERS4
-rw-r--r--drivers/block/rbd.c2188
-rw-r--r--drivers/block/rbd_types.h10
-rw-r--r--fs/ceph/Kconfig12
-rw-r--r--fs/ceph/acl.c22
-rw-r--r--fs/ceph/addr.c2
-rw-r--r--fs/ceph/caps.c120
-rw-r--r--fs/ceph/debugfs.c2
-rw-r--r--fs/ceph/dir.c73
-rw-r--r--fs/ceph/export.c2
-rw-r--r--fs/ceph/file.c34
-rw-r--r--fs/ceph/inode.c208
-rw-r--r--fs/ceph/mds_client.c120
-rw-r--r--fs/ceph/mds_client.h4
-rw-r--r--fs/ceph/mdsmap.c12
-rw-r--r--fs/ceph/quota.c15
-rw-r--r--fs/ceph/snap.c3
-rw-r--r--fs/ceph/super.c13
-rw-r--r--fs/ceph/super.h67
-rw-r--r--fs/ceph/xattr.c456
-rw-r--r--include/linux/ceph/ceph_features.h1
-rw-r--r--include/linux/ceph/ceph_fs.h2
-rw-r--r--include/linux/ceph/cls_lock_client.h3
-rw-r--r--include/linux/ceph/decode.h13
-rw-r--r--include/linux/ceph/libceph.h10
-rw-r--r--include/linux/ceph/mon_client.h1
-rw-r--r--include/linux/ceph/osd_client.h12
-rw-r--r--include/linux/ceph/striper.h2
-rw-r--r--include/linux/iversion.h24
-rw-r--r--net/ceph/Makefile2
-rw-r--r--net/ceph/cls_lock_client.c54
-rw-r--r--net/ceph/decode.c84
-rw-r--r--net/ceph/messenger.c14
-rw-r--r--net/ceph/mon_client.c21
-rw-r--r--net/ceph/osd_client.c42
-rw-r--r--net/ceph/osdmap.c31
-rw-r--r--net/ceph/pagevec.c33
-rw-r--r--net/ceph/striper.c17
38 files changed, 2606 insertions, 1127 deletions
diff --git a/MAINTAINERS b/MAINTAINERS
index d51808468713..f63e9d1468f6 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -3765,7 +3765,7 @@ F: arch/powerpc/platforms/cell/
CEPH COMMON CODE (LIBCEPH)
M: Ilya Dryomov <idryomov@gmail.com>
-M: "Yan, Zheng" <zyan@redhat.com>
+M: Jeff Layton <jlayton@kernel.org>
M: Sage Weil <sage@redhat.com>
L: ceph-devel@vger.kernel.org
W: http://ceph.com/
@@ -3777,7 +3777,7 @@ F: include/linux/ceph/
F: include/linux/crush/
CEPH DISTRIBUTED FILE SYSTEM CLIENT (CEPH)
-M: "Yan, Zheng" <zyan@redhat.com>
+M: Jeff Layton <jlayton@kernel.org>
M: Sage Weil <sage@redhat.com>
M: Ilya Dryomov <idryomov@gmail.com>
L: ceph-devel@vger.kernel.org
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index e5009a34f9c2..3327192bb71f 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -115,6 +115,8 @@ static int atomic_dec_return_safe(atomic_t *v)
#define RBD_FEATURE_LAYERING (1ULL<<0)
#define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
#define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
+#define RBD_FEATURE_OBJECT_MAP (1ULL<<3)
+#define RBD_FEATURE_FAST_DIFF (1ULL<<4)
#define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5)
#define RBD_FEATURE_DATA_POOL (1ULL<<7)
#define RBD_FEATURE_OPERATIONS (1ULL<<8)
@@ -122,6 +124,8 @@ static int atomic_dec_return_safe(atomic_t *v)
#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
RBD_FEATURE_STRIPINGV2 | \
RBD_FEATURE_EXCLUSIVE_LOCK | \
+ RBD_FEATURE_OBJECT_MAP | \
+ RBD_FEATURE_FAST_DIFF | \
RBD_FEATURE_DEEP_FLATTEN | \
RBD_FEATURE_DATA_POOL | \
RBD_FEATURE_OPERATIONS)
@@ -203,6 +207,11 @@ struct rbd_client {
struct list_head node;
};
+struct pending_result {
+ int result; /* first nonzero result */
+ int num_pending;
+};
+
struct rbd_img_request;
enum obj_request_type {
@@ -219,6 +228,18 @@ enum obj_operation_type {
OBJ_OP_ZEROOUT,
};
+#define RBD_OBJ_FLAG_DELETION (1U << 0)
+#define RBD_OBJ_FLAG_COPYUP_ENABLED (1U << 1)
+#define RBD_OBJ_FLAG_COPYUP_ZEROS (1U << 2)
+#define RBD_OBJ_FLAG_MAY_EXIST (1U << 3)
+#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT (1U << 4)
+
+enum rbd_obj_read_state {
+ RBD_OBJ_READ_START = 1,
+ RBD_OBJ_READ_OBJECT,
+ RBD_OBJ_READ_PARENT,
+};
+
/*
* Writes go through the following state machine to deal with
* layering:
@@ -245,17 +266,28 @@ enum obj_operation_type {
* even if there is a parent).
*/
enum rbd_obj_write_state {
- RBD_OBJ_WRITE_FLAT = 1,
- RBD_OBJ_WRITE_GUARD,
- RBD_OBJ_WRITE_READ_FROM_PARENT,
- RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC,
- RBD_OBJ_WRITE_COPYUP_OPS,
+ RBD_OBJ_WRITE_START = 1,
+ RBD_OBJ_WRITE_PRE_OBJECT_MAP,
+ RBD_OBJ_WRITE_OBJECT,
+ __RBD_OBJ_WRITE_COPYUP,
+ RBD_OBJ_WRITE_COPYUP,
+ RBD_OBJ_WRITE_POST_OBJECT_MAP,
+};
+
+enum rbd_obj_copyup_state {
+ RBD_OBJ_COPYUP_START = 1,
+ RBD_OBJ_COPYUP_READ_PARENT,
+ __RBD_OBJ_COPYUP_OBJECT_MAPS,
+ RBD_OBJ_COPYUP_OBJECT_MAPS,
+ __RBD_OBJ_COPYUP_WRITE_OBJECT,
+ RBD_OBJ_COPYUP_WRITE_OBJECT,
};
struct rbd_obj_request {
struct ceph_object_extent ex;
+ unsigned int flags; /* RBD_OBJ_FLAG_* */
union {
- bool tried_parent; /* for reads */
+ enum rbd_obj_read_state read_state; /* for reads */
enum rbd_obj_write_state write_state; /* for writes */
};
@@ -271,14 +303,15 @@ struct rbd_obj_request {
u32 bvec_idx;
};
};
+
+ enum rbd_obj_copyup_state copyup_state;
struct bio_vec *copyup_bvecs;
u32 copyup_bvec_count;
- struct ceph_osd_request *osd_req;
-
- u64 xferred; /* bytes transferred */
- int result;
+ struct list_head osd_reqs; /* w/ r_private_item */
+ struct mutex state_mutex;
+ struct pending_result pending;
struct kref kref;
};
@@ -287,11 +320,19 @@ enum img_req_flags {
IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
};
+enum rbd_img_state {
+ RBD_IMG_START = 1,
+ RBD_IMG_EXCLUSIVE_LOCK,
+ __RBD_IMG_OBJECT_REQUESTS,
+ RBD_IMG_OBJECT_REQUESTS,
+};
+
struct rbd_img_request {
struct rbd_device *rbd_dev;
enum obj_operation_type op_type;
enum obj_request_type data_type;
unsigned long flags;
+ enum rbd_img_state state;
union {
u64 snap_id; /* for reads */
struct ceph_snap_context *snapc; /* for writes */
@@ -300,13 +341,14 @@ struct rbd_img_request {
struct request *rq; /* block request */
struct rbd_obj_request *obj_request; /* obj req initiator */
};
- spinlock_t completion_lock;
- u64 xferred;/* aggregate bytes transferred */
- int result; /* first nonzero obj_request result */
+ struct list_head lock_item;
struct list_head object_extents; /* obj_req.ex structs */
- u32 pending_count;
+ struct mutex state_mutex;
+ struct pending_result pending;
+ struct work_struct work;
+ int work_result;
struct kref kref;
};
@@ -380,7 +422,17 @@ struct rbd_device {
struct work_struct released_lock_work;
struct delayed_work lock_dwork;
struct work_struct unlock_work;
- wait_queue_head_t lock_waitq;
+ spinlock_t lock_lists_lock;
+ struct list_head acquiring_list;
+ struct list_head running_list;
+ struct completion acquire_wait;
+ int acquire_err;
+ struct completion releasing_wait;
+
+ spinlock_t object_map_lock;
+ u8 *object_map;
+ u64 object_map_size; /* in objects */
+ u64 object_map_flags;
struct workqueue_struct *task_wq;
@@ -408,12 +460,10 @@ struct rbd_device {
* Flag bits for rbd_dev->flags:
* - REMOVING (which is coupled with rbd_dev->open_count) is protected
* by rbd_dev->lock
- * - BLACKLISTED is protected by rbd_dev->lock_rwsem
*/
enum rbd_dev_flags {
RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
- RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
};
static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
@@ -466,6 +516,8 @@ static int minor_to_rbd_dev_id(int minor)
static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
{
+ lockdep_assert_held(&rbd_dev->lock_rwsem);
+
return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
}
@@ -583,6 +635,26 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
u8 *order, u64 *snap_size);
static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
u64 *snap_features);
+static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
+
+static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
+static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
+
+/*
+ * Return true if nothing else is pending.
+ */
+static bool pending_result_dec(struct pending_result *pending, int *result)
+{
+ rbd_assert(pending->num_pending > 0);
+
+ if (*result && !pending->result)
+ pending->result = *result;
+ if (--pending->num_pending)
+ return false;
+
+ *result = pending->result;
+ return true;
+}
static int rbd_open(struct block_device *bdev, fmode_t mode)
{
@@ -1317,6 +1389,8 @@ static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
u32 bytes)
{
+ dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
+
switch (obj_req->img_request->data_type) {
case OBJ_REQUEST_BIO:
zero_bios(&obj_req->bio_pos, off, bytes);
@@ -1339,13 +1413,6 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
kref_put(&obj_request->kref, rbd_obj_request_destroy);
}
-static void rbd_img_request_get(struct rbd_img_request *img_request)
-{
- dout("%s: img %p (was %d)\n", __func__, img_request,
- kref_read(&img_request->kref));
- kref_get(&img_request->kref);
-}
-
static void rbd_img_request_destroy(struct kref *kref);
static void rbd_img_request_put(struct rbd_img_request *img_request)
{
@@ -1362,7 +1429,6 @@ static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
/* Image request now owns object's original reference */
obj_request->img_request = img_request;
- img_request->pending_count++;
dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
}
@@ -1375,13 +1441,13 @@ static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
rbd_obj_request_put(obj_request);
}
-static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
+static void rbd_osd_submit(struct ceph_osd_request *osd_req)
{
- struct ceph_osd_request *osd_req = obj_request->osd_req;
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
- dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
- obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
- obj_request->ex.oe_len, osd_req);
+ dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
+ __func__, osd_req, obj_req, obj_req->ex.oe_objno,
+ obj_req->ex.oe_off, obj_req->ex.oe_len);
ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
}
@@ -1457,41 +1523,38 @@ static bool rbd_img_is_write(struct rbd_img_request *img_req)
}
}
-static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
-
static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
{
struct rbd_obj_request *obj_req = osd_req->r_priv;
+ int result;
dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
osd_req->r_result, obj_req);
- rbd_assert(osd_req == obj_req->osd_req);
- obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
- if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
- obj_req->xferred = osd_req->r_result;
+ /*
+ * Writes aren't allowed to return a data payload. In some
+ * guarded write cases (e.g. stat + zero on an empty object)
+ * a stat response makes it through, but we don't care.
+ */
+ if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
+ result = 0;
else
- /*
- * Writes aren't allowed to return a data payload. In some
- * guarded write cases (e.g. stat + zero on an empty object)
- * a stat response makes it through, but we don't care.
- */
- obj_req->xferred = 0;
+ result = osd_req->r_result;
- rbd_obj_handle_request(obj_req);
+ rbd_obj_handle_request(obj_req, result);
}
-static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
+static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
{
- struct ceph_osd_request *osd_req = obj_request->osd_req;
+ struct rbd_obj_request *obj_request = osd_req->r_priv;
osd_req->r_flags = CEPH_OSD_FLAG_READ;
osd_req->r_snapid = obj_request->img_request->snap_id;
}
-static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
+static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
{
- struct ceph_osd_request *osd_req = obj_request->osd_req;
+ struct rbd_obj_request *obj_request = osd_req->r_priv;
osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
ktime_get_real_ts64(&osd_req->r_mtime);
@@ -1499,19 +1562,21 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
}
static struct ceph_osd_request *
-__rbd_osd_req_create(struct rbd_obj_request *obj_req,
- struct ceph_snap_context *snapc, unsigned int num_ops)
+__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
+ struct ceph_snap_context *snapc, int num_ops)
{
struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct ceph_osd_request *req;
const char *name_format = rbd_dev->image_format == 1 ?
RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
+ int ret;
req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
if (!req)
- return NULL;
+ return ERR_PTR(-ENOMEM);
+ list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
req->r_callback = rbd_osd_req_callback;
req->r_priv = obj_req;
@@ -1522,27 +1587,20 @@ __rbd_osd_req_create(struct rbd_obj_request *obj_req,
ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
req->r_base_oloc.pool = rbd_dev->layout.pool_id;
- if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
- rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
- goto err_req;
+ ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
+ rbd_dev->header.object_prefix,
+ obj_req->ex.oe_objno);
+ if (ret)
+ return ERR_PTR(ret);
return req;
-
-err_req:
- ceph_osdc_put_request(req);
- return NULL;
}
static struct ceph_osd_request *
-rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
+rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
{
- return __rbd_osd_req_create(obj_req, obj_req->img_request->snapc,
- num_ops);
-}
-
-static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
-{
- ceph_osdc_put_request(osd_req);
+ return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
+ num_ops);
}
static struct rbd_obj_request *rbd_obj_request_create(void)
@@ -1554,6 +1612,8 @@ static struct rbd_obj_request *rbd_obj_request_create(void)
return NULL;
ceph_object_extent_init(&obj_request->ex);
+ INIT_LIST_HEAD(&obj_request->osd_reqs);
+ mutex_init(&obj_request->state_mutex);
kref_init(&obj_request->kref);
dout("%s %p\n", __func__, obj_request);
@@ -1563,14 +1623,19 @@ static struct rbd_obj_request *rbd_obj_request_create(void)
static void rbd_obj_request_destroy(struct kref *kref)
{
struct rbd_obj_request *obj_request;
+ struct ceph_osd_request *osd_req;
u32 i;
obj_request = container_of(kref, struct rbd_obj_request, kref);
dout("%s: obj %p\n", __func__, obj_request);
- if (obj_request->osd_req)
- rbd_osd_req_destroy(obj_request->osd_req);
+ while (!list_empty(&obj_request->osd_reqs)) {
+ osd_req = list_first_entry(&obj_request->osd_reqs,
+ struct ceph_osd_request, r_private_item);
+ list_del_init(&osd_req->r_private_item);
+ ceph_osdc_put_request(osd_req);
+ }
switch (obj_request->img_request->data_type) {
case OBJ_REQUEST_NODATA:
@@ -1684,8 +1749,9 @@ static struct rbd_img_request *rbd_img_request_create(
if (rbd_dev_parent_get(rbd_dev))
img_request_layered_set(img_request);
- spin_lock_init(&img_request->completion_lock);
+ INIT_LIST_HEAD(&img_request->lock_item);
INIT_LIST_HEAD(&img_request->object_extents);
+ mutex_init(&img_request->state_mutex);
kref_init(&img_request->kref);
dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev,
@@ -1703,6 +1769,7 @@ static void rbd_img_request_destroy(struct kref *kref)
dout("%s: img %p\n", __func__, img_request);
+ WARN_ON(!list_empty(&img_request->lock_item));
for_each_obj_request_safe(img_request, obj_request, next_obj_request)
rbd_img_obj_request_del(img_request, obj_request);
@@ -1717,6 +1784,466 @@ static void rbd_img_request_destroy(struct kref *kref)
kmem_cache_free(rbd_img_request_cache, img_request);
}
+#define BITS_PER_OBJ 2
+#define OBJS_PER_BYTE (BITS_PER_BYTE / BITS_PER_OBJ)
+#define OBJ_MASK ((1 << BITS_PER_OBJ) - 1)
+
+static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
+ u64 *index, u8 *shift)
+{
+ u32 off;
+
+ rbd_assert(objno < rbd_dev->object_map_size);
+ *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
+ *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
+}
+
+static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
+{
+ u64 index;
+ u8 shift;
+
+ lockdep_assert_held(&rbd_dev->object_map_lock);
+ __rbd_object_map_index(rbd_dev, objno, &index, &shift);
+ return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
+}
+
+static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
+{
+ u64 index;
+ u8 shift;
+ u8 *p;
+
+ lockdep_assert_held(&rbd_dev->object_map_lock);
+ rbd_assert(!(val & ~OBJ_MASK));
+
+ __rbd_object_map_index(rbd_dev, objno, &index, &shift);
+ p = &rbd_dev->object_map[index];
+ *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
+}
+
+static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
+{
+ u8 state;
+
+ spin_lock(&rbd_dev->object_map_lock);
+ state = __rbd_object_map_get(rbd_dev, objno);
+ spin_unlock(&rbd_dev->object_map_lock);
+ return state;
+}
+
+static bool use_object_map(struct rbd_device *rbd_dev)
+{
+ return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
+ !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
+}
+
+static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
+{
+ u8 state;
+
+ /* fall back to default logic if object map is disabled or invalid */
+ if (!use_object_map(rbd_dev))
+ return true;
+
+ state = rbd_object_map_get(rbd_dev, objno);
+ return state != OBJECT_NONEXISTENT;
+}
+
+static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
+ struct ceph_object_id *oid)
+{
+ if (snap_id == CEPH_NOSNAP)
+ ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
+ rbd_dev->spec->image_id);
+ else
+ ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
+ rbd_dev->spec->image_id, snap_id);
+}
+
+static int rbd_object_map_lock(struct rbd_device *rbd_dev)
+{
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+ CEPH_DEFINE_OID_ONSTACK(oid);
+ u8 lock_type;
+ char *lock_tag;
+ struct ceph_locker *lockers;
+ u32 num_lockers;
+ bool broke_lock = false;
+ int ret;
+
+ rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
+
+again:
+ ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
+ CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
+ if (ret != -EBUSY || broke_lock) {
+ if (ret == -EEXIST)
+ ret = 0; /* already locked by myself */
+ if (ret)
+ rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
+ return ret;
+ }
+
+ ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
+ RBD_LOCK_NAME, &lock_type, &lock_tag,
+ &lockers, &num_lockers);
+ if (ret) {
+ if (ret == -ENOENT)
+ goto again;
+
+ rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
+ return ret;
+ }
+
+ kfree(lock_tag);
+ if (num_lockers == 0)
+ goto again;
+
+ rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
+ ENTITY_NAME(lockers[0].id.name));
+
+ ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
+ RBD_LOCK_NAME, lockers[0].id.cookie,
+ &lockers[0].id.name);
+ ceph_free_lockers(lockers, num_lockers);
+ if (ret) {
+ if (ret == -ENOENT)
+ goto again;
+
+ rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
+ return ret;
+ }
+
+ broke_lock = true;
+ goto again;
+}
+
+static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
+{
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+ CEPH_DEFINE_OID_ONSTACK(oid);
+ int ret;
+
+ rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
+
+ ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
+ "");
+ if (ret && ret != -ENOENT)
+ rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
+}
+
+static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
+{
+ u8 struct_v;
+ u32 struct_len;
+ u32 header_len;
+ void *header_end;
+ int ret;
+
+ ceph_decode_32_safe(p, end, header_len, e_inval);
+ header_end = *p + header_len;
+
+ ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
+ &struct_len);
+ if (ret)
+ return ret;
+
+ ceph_decode_64_safe(p, end, *object_map_size, e_inval);
+
+ *p = header_end;
+ return 0;
+
+e_inval:
+ return -EINVAL;
+}
+
+static int __rbd_object_map_load(struct rbd_device *rbd_dev)
+{
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+ CEPH_DEFINE_OID_ONSTACK(oid);
+ struct page **pages;
+ void *p, *end;
+ size_t reply_len;
+ u64 num_objects;
+ u64 object_map_bytes;
+ u64 object_map_size;
+ int num_pages;
+ int ret;
+
+ rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
+
+ num_objects = ceph_get_num_objects(&rbd_dev->layout,
+ rbd_dev->mapping.size);
+ object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
+ BITS_PER_BYTE);
+ num_pages = calc_pages_for(0, object_map_bytes) + 1;
+ pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+ if (IS_ERR(pages))
+ return PTR_ERR(pages);
+
+ reply_len = num_pages * PAGE_SIZE;
+ rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
+ ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
+ "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
+ NULL, 0, pages, &reply_len);
+ if (ret)
+ goto out;
+
+ p = page_address(pages[0]);
+ end = p + min(reply_len, (size_t)PAGE_SIZE);
+ ret = decode_object_map_header(&p, end, &object_map_size);
+ if (ret)
+ goto out;
+
+ if (object_map_size != num_objects) {
+ rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
+ object_map_size, num_objects);
+ ret = -EINVAL;
+ goto out;
+ }
+
+ if (offset_in_page(p) + object_map_bytes > reply_len) {
+ ret = -EINVAL;
+ goto out;
+ }
+
+ rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
+ if (!rbd_dev->object_map) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ rbd_dev->object_map_size = object_map_size;
+ ceph_copy_from_page_vector(pages, rbd_dev->object_map,
+ offset_in_page(p), object_map_bytes);
+
+out:
+ ceph_release_page_vector(pages, num_pages);
+ return ret;
+}
+
+static void rbd_object_map_free(struct rbd_device *rbd_dev)
+{
+ kvfree(rbd_dev->object_map);
+ rbd_dev->object_map = NULL;
+ rbd_dev->object_map_size = 0;
+}
+
+static int rbd_object_map_load(struct rbd_device *rbd_dev)
+{
+ int ret;
+
+ ret = __rbd_object_map_load(rbd_dev);
+ if (ret)
+ return ret;
+
+ ret = rbd_dev_v2_get_flags(rbd_dev);
+ if (ret) {
+ rbd_object_map_free(rbd_dev);
+ return ret;
+ }
+
+ if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
+ rbd_warn(rbd_dev, "object map is invalid");
+
+ return 0;
+}
+
+static int rbd_object_map_open(struct rbd_device *rbd_dev)
+{
+ int ret;
+
+ ret = rbd_object_map_lock(rbd_dev);
+ if (ret)
+ return ret;
+
+ ret = rbd_object_map_load(rbd_dev);
+ if (ret) {
+ rbd_object_map_unlock(rbd_dev);
+ return ret;
+ }
+
+ return 0;
+}
+
+static void rbd_object_map_close(struct rbd_device *rbd_dev)
+{
+ rbd_object_map_free(rbd_dev);
+ rbd_object_map_unlock(rbd_dev);
+}
+
+/*
+ * This function needs snap_id (or more precisely just something to
+ * distinguish between HEAD and snapshot object maps), new_state and
+ * current_state that were passed to rbd_object_map_update().
+ *
+ * To avoid allocating and stashing a context we piggyback on the OSD
+ * request. A HEAD update has two ops (assert_locked). For new_state
+ * and current_state we decode our own object_map_update op, encoded in
+ * rbd_cls_object_map_update().
+ */
+static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
+ struct ceph_osd_request *osd_req)
+{
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+ struct ceph_osd_data *osd_data;
+ u64 objno;
+ u8 state, new_state, current_state;
+ bool has_current_state;
+ void *p;
+
+ if (osd_req->r_result)
+ return osd_req->r_result;
+
+ /*
+ * Nothing to do for a snapshot object map.
+ */
+ if (osd_req->r_num_ops == 1)
+ return 0;
+
+ /*
+ * Update in-memory HEAD object map.
+ */
+ rbd_assert(osd_req->r_num_ops == 2);
+ osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
+ rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
+
+ p = page_address(osd_data->pages[0]);
+ objno = ceph_decode_64(&p);
+ rbd_assert(objno == obj_req->ex.oe_objno);
+ rbd_assert(ceph_decode_64(&p) == objno + 1);
+ new_state = ceph_decode_8(&p);
+ has_current_state = ceph_decode_8(&p);
+ if (has_current_state)
+ current_state = ceph_decode_8(&p);
+
+ spin_lock(&rbd_dev->object_map_lock);
+ state = __rbd_object_map_get(rbd_dev, objno);
+ if (!has_current_state || current_state == state ||
+ (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
+ __rbd_object_map_set(rbd_dev, objno, new_state);
+ spin_unlock(&rbd_dev->object_map_lock);
+
+ return 0;
+}
+
+static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
+{
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
+ int result;
+
+ dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
+ osd_req->r_result, obj_req);
+
+ result = rbd_object_map_update_finish(obj_req, osd_req);
+ rbd_obj_handle_request(obj_req, result);
+}
+
+static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
+{
+ u8 state = rbd_object_map_get(rbd_dev, objno);
+
+ if (state == new_state ||
+ (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
+ (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
+ return false;
+
+ return true;
+}
+
+static int rbd_cls_object_map_update(struct ceph_osd_request *req,
+ int which, u64 objno, u8 new_state,
+ const u8 *current_state)
+{
+ struct page **pages;
+ void *p, *start;
+ int ret;
+
+ ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
+ if (ret)
+ return ret;
+
+ pages = ceph_alloc_page_vector(1, GFP_NOIO);
+ if (IS_ERR(pages))
+ return PTR_ERR(pages);
+
+ p = start = page_address(pages[0]);
+ ceph_encode_64(&p, objno);
+ ceph_encode_64(&p, objno + 1);
+ ceph_encode_8(&p, new_state);
+ if (current_state) {
+ ceph_encode_8(&p, 1);
+ ceph_encode_8(&p, *current_state);
+ } else {
+ ceph_encode_8(&p, 0);
+ }
+
+ osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
+ false, true);
+ return 0;
+}
+
+/*
+ * Return:
+ * 0 - object map update sent
+ * 1 - object map update isn't needed
+ * <0 - error
+ */
+static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
+ u8 new_state, const u8 *current_state)
+{
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+ struct ceph_osd_request *req;
+ int num_ops = 1;
+ int which = 0;
+ int ret;
+
+ if (snap_id == CEPH_NOSNAP) {
+ if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
+ return 1;
+
+ num_ops++; /* assert_locked */
+ }
+
+ req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
+ if (!req)
+ return -ENOMEM;
+
+ list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
+ req->r_callback = rbd_object_map_callback;
+ req->r_priv = obj_req;
+
+ rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
+ ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
+ req->r_flags = CEPH_OSD_FLAG_WRITE;
+ ktime_get_real_ts64(&req->r_mtime);
+
+ if (snap_id == CEPH_NOSNAP) {
+ /*
+ * Protect against possible race conditions during lock
+ * ownership transitions.
+ */
+ ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
+ CEPH_CLS_LOCK_EXCLUSIVE, "", "");
+ if (ret)
+ return ret;
+ }
+
+ ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
+ new_state, current_state);
+ if (ret)
+ return ret;
+
+ ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+ if (ret)
+ return ret;
+
+ ceph_osdc_start_request(osdc, req, false);
+ return 0;
+}
+
static void prune_extents(struct ceph_file_extent *img_extents,
u32 *num_img_extents, u64 overlap)
{
@@ -1764,11 +2291,13 @@ static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
return 0;
}
-static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
+static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
{
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
+
switch (obj_req->img_request->data_type) {
case OBJ_REQUEST_BIO:
- osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
+ osd_req_op_extent_osd_data_bio(osd_req, which,
&obj_req->bio_pos,
obj_req->ex.oe_len);
break;
@@ -1777,7 +2306,7 @@ static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
rbd_assert(obj_req->bvec_pos.iter.bi_size ==
obj_req->ex.oe_len);
rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
- osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
+ osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
&obj_req->bvec_pos);
break;
default:
@@ -1785,22 +2314,7 @@ static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
}
}
-static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
-{
- obj_req->osd_req = __rbd_osd_req_create(obj_req, NULL, 1);
- if (!obj_req->osd_req)
- return -ENOMEM;
-
- osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
- obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
- rbd_osd_req_setup_data(obj_req, 0);
-
- rbd_osd_req_format_read(obj_req);
- return 0;
-}
-
-static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
- unsigned int which)
+static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
{
struct page **pages;
@@ -1816,45 +2330,60 @@ static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
if (IS_ERR(pages))
return PTR_ERR(pages);
- osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
- osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
+ osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
+ osd_req_op_raw_data_in_pages(osd_req, which, pages,
8 + sizeof(struct ceph_timespec),
0, false, true);
return 0;
}
-static int count_write_ops(struct rbd_obj_request *obj_req)
+static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
+ u32 bytes)
+{
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
+ int ret;
+
+ ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
+ if (ret)
+ return ret;
+
+ osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
+ obj_req->copyup_bvec_count, bytes);
+ return 0;
+}
+
+static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
{
- return 2; /* setallochint + write/writefull */
+ obj_req->read_state = RBD_OBJ_READ_START;
+ return 0;
}
-static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
- unsigned int which)
+static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
+ int which)
{
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
u16 opcode;
- osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
- rbd_dev->layout.object_size,
- rbd_dev->layout.object_size);
+ if (!use_object_map(rbd_dev) ||
+ !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
+ osd_req_op_alloc_hint_init(osd_req, which++,
+ rbd_dev->layout.object_size,
+ rbd_dev->layout.object_size);
+ }
if (rbd_obj_is_entire(obj_req))
opcode = CEPH_OSD_OP_WRITEFULL;
else
opcode = CEPH_OSD_OP_WRITE;
- osd_req_op_extent_init(obj_req->osd_req, which, opcode,
+ osd_req_op_extent_init(osd_req, which, opcode,
obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
- rbd_osd_req_setup_data(obj_req, which++);
-
- rbd_assert(which == obj_req->osd_req->r_num_ops);
- rbd_osd_req_format_write(obj_req);
+ rbd_osd_setup_data(osd_req, which);
}
-static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
+static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
{
- unsigned int num_osd_ops, which = 0;
- bool need_guard;
int ret;
/* reverse map the entire object onto the parent */
@@ -1862,24 +2391,10 @@ static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
if (ret)
return ret;
- need_guard = rbd_obj_copyup_enabled(obj_req);
- num_osd_ops = need_guard + count_write_ops(obj_req);
-
- obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
- if (!obj_req->osd_req)
- return -ENOMEM;
-
- if (need_guard) {
- ret = __rbd_obj_setup_stat(obj_req, which++);
- if (ret)
- return ret;
+ if (rbd_obj_copyup_enabled(obj_req))
+ obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
- obj_req->write_state = RBD_OBJ_WRITE_GUARD;
- } else {
- obj_req->write_state = RBD_OBJ_WRITE_FLAT;
- }
-
- __rbd_obj_setup_write(obj_req, which);
+ obj_req->write_state = RBD_OBJ_WRITE_START;
return 0;
}
@@ -1889,11 +2404,26 @@ static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
CEPH_OSD_OP_ZERO;
}
-static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
+static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
+ int which)
+{
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
+
+ if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
+ rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
+ osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
+ } else {
+ osd_req_op_extent_init(osd_req, which,
+ truncate_or_zero_opcode(obj_req),
+ obj_req->ex.oe_off, obj_req->ex.oe_len,
+ 0, 0);
+ }
+}
+
+static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
{
struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
- u64 off = obj_req->ex.oe_off;
- u64 next_off = obj_req->ex.oe_off + obj_req->ex.oe_len;
+ u64 off, next_off;
int ret;
/*
@@ -1906,10 +2436,17 @@ static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
*/
if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
!rbd_obj_is_tail(obj_req)) {
- off = round_up(off, rbd_dev->opts->alloc_size);
- next_off = round_down(next_off, rbd_dev->opts->alloc_size);
+ off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
+ next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
+ rbd_dev->opts->alloc_size);
if (off >= next_off)
return 1;
+
+ dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
+ obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
+ off, next_off - off);
+ obj_req->ex.oe_off = off;
+ obj_req->ex.oe_len = next_off - off;
}
/* reverse map the entire object onto the parent */
@@ -1917,52 +2454,29 @@ static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
if (ret)
return ret;
- obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
- if (!obj_req->osd_req)
- return -ENOMEM;
-
- if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
- osd_req_op_init(obj_req->osd_req, 0, CEPH_OSD_OP_DELETE, 0);
- } else {
- dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
- obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
- off, next_off - off);
- osd_req_op_extent_init(obj_req->osd_req, 0,
- truncate_or_zero_opcode(obj_req),
- off, next_off - off, 0, 0);
- }
+ obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
+ if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
+ obj_req->flags |= RBD_OBJ_FLAG_DELETION;
- obj_req->write_state = RBD_OBJ_WRITE_FLAT;
- rbd_osd_req_format_write(obj_req);
+ obj_req->write_state = RBD_OBJ_WRITE_START;
return 0;
}
-static int count_zeroout_ops(struct rbd_obj_request *obj_req)
-{
- int num_osd_ops;
-
- if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
- !rbd_obj_copyup_enabled(obj_req))
- num_osd_ops = 2; /* create + truncate */
- else
- num_osd_ops = 1; /* delete/truncate/zero */
-
- return num_osd_ops;
-}
-
-static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req,
- unsigned int which)
+static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
+ int which)
{
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
u16 opcode;
if (rbd_obj_is_entire(obj_req)) {
if (obj_req->num_img_extents) {
- if (!rbd_obj_copyup_enabled(obj_req))
- osd_req_op_init(obj_req->osd_req, which++,
+ if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
+ osd_req_op_init(osd_req, which++,
CEPH_OSD_OP_CREATE, 0);
opcode = CEPH_OSD_OP_TRUNCATE;
} else {
- osd_req_op_init(obj_req->osd_req, which++,
+ rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
+ osd_req_op_init(osd_req, which++,
CEPH_OSD_OP_DELETE, 0);
opcode = 0;
}
@@ -1971,18 +2485,13 @@ static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req,
}
if (opcode)
- osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
+ osd_req_op_extent_init(osd_req, which, opcode,
obj_req->ex.oe_off, obj_req->ex.oe_len,
0, 0);
-
- rbd_assert(which == obj_req->osd_req->r_num_ops);
- rbd_osd_req_format_write(obj_req);
}
-static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req)
+static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
{
- unsigned int num_osd_ops, which = 0;
- bool need_guard;
int ret;
/* reverse map the entire object onto the parent */
@@ -1990,31 +2499,66 @@ static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req)
if (ret)
return ret;
- need_guard = rbd_obj_copyup_enabled(obj_req);
- num_osd_ops = need_guard + count_zeroout_ops(obj_req);
+ if (rbd_obj_copyup_enabled(obj_req))
+ obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
+ if (!obj_req->num_img_extents) {
+ obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
+ if (rbd_obj_is_entire(obj_req))
+ obj_req->flags |= RBD_OBJ_FLAG_DELETION;
+ }
- obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
- if (!obj_req->osd_req)
- return -ENOMEM;
+ obj_req->write_state = RBD_OBJ_WRITE_START;
+ return 0;
+}
- if (need_guard) {
- ret = __rbd_obj_setup_stat(obj_req, which++);
- if (ret)
- return ret;
+static int count_write_ops(struct rbd_obj_request *obj_req)
+{
+ struct rbd_img_request *img_req = obj_req->img_request;
- obj_req->write_state = RBD_OBJ_WRITE_GUARD;
- } else {
- obj_req->write_state = RBD_OBJ_WRITE_FLAT;
+ switch (img_req->op_type) {
+ case OBJ_OP_WRITE:
+ if (!use_object_map(img_req->rbd_dev) ||
+ !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
+ return 2; /* setallochint + write/writefull */
+
+ return 1; /* write/writefull */
+ case OBJ_OP_DISCARD:
+ return 1; /* delete/truncate/zero */
+ case OBJ_OP_ZEROOUT:
+ if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
+ !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
+ return 2; /* create + truncate */
+
+ return 1; /* delete/truncate/zero */
+ default:
+ BUG();
}
+}
- __rbd_obj_setup_zeroout(obj_req, which);
- return 0;
+static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
+ int which)
+{
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
+
+ switch (obj_req->img_request->op_type) {
+ case OBJ_OP_WRITE:
+ __rbd_osd_setup_write_ops(osd_req, which);
+ break;
+ case OBJ_OP_DISCARD:
+ __rbd_osd_setup_discard_ops(osd_req, which);
+ break;
+ case OBJ_OP_ZEROOUT:
+ __rbd_osd_setup_zeroout_ops(osd_req, which);
+ break;
+ default:
+ BUG();
+ }
}
/*
- * For each object request in @img_req, allocate an OSD request, add
- * individual OSD ops and prepare them for submission. The number of
- * OSD ops depends on op_type and the overlap point (if any).
+ * Prune the list of object requests (adjust offset and/or length, drop
+ * redundant requests). Prepare object request state machines and image
+ * request state machine for execution.
*/
static int __rbd_img_fill_request(struct rbd_img_request *img_req)
{
@@ -2024,16 +2568,16 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req)
for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
switch (img_req->op_type) {
case OBJ_OP_READ:
- ret = rbd_obj_setup_read(obj_req);
+ ret = rbd_obj_init_read(obj_req);
break;
case OBJ_OP_WRITE:
- ret = rbd_obj_setup_write(obj_req);
+ ret = rbd_obj_init_write(obj_req);
break;
case OBJ_OP_DISCARD:
- ret = rbd_obj_setup_discard(obj_req);
+ ret = rbd_obj_init_discard(obj_req);
break;
case OBJ_OP_ZEROOUT:
- ret = rbd_obj_setup_zeroout(obj_req);
+ ret = rbd_obj_init_zeroout(obj_req);
break;
default:
BUG();
@@ -2041,17 +2585,12 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req)
if (ret < 0)
return ret;
if (ret > 0) {
- img_req->xferred += obj_req->ex.oe_len;
- img_req->pending_count--;
rbd_img_obj_request_del(img_req, obj_req);
continue;
}
-
- ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
- if (ret)
- return ret;
}
+ img_req->state = RBD_IMG_START;
return 0;
}
@@ -2340,17 +2879,55 @@ static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
&it);
}
-static void rbd_img_request_submit(struct rbd_img_request *img_request)
+static void rbd_img_handle_request_work(struct work_struct *work)
{
- struct rbd_obj_request *obj_request;
+ struct rbd_img_request *img_req =
+ container_of(work, struct rbd_img_request, work);
- dout("%s: img %p\n", __func__, img_request);
+ rbd_img_handle_request(img_req, img_req->work_result);
+}
- rbd_img_request_get(img_request);
- for_each_obj_request(img_request, obj_request)
- rbd_obj_request_submit(obj_request);
+static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
+{
+ INIT_WORK(&img_req->work, rbd_img_handle_request_work);
+ img_req->work_result = result;
+ queue_work(rbd_wq, &img_req->work);
+}
- rbd_img_request_put(img_request);
+static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
+{
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+
+ if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
+ obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
+ return true;
+ }
+
+ dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
+ obj_req->ex.oe_objno);
+ return false;
+}
+
+static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
+{
+ struct ceph_osd_request *osd_req;
+ int ret;
+
+ osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
+ if (IS_ERR(osd_req))
+ return PTR_ERR(osd_req);
+
+ osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
+ obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
+ rbd_osd_setup_data(osd_req, 0);
+ rbd_osd_format_read(osd_req);
+
+ ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
+ if (ret)
+ return ret;
+
+ rbd_osd_submit(osd_req);
+ return 0;
}
static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
@@ -2396,51 +2973,144 @@ static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
return ret;
}
- rbd_img_request_submit(child_img_req);
+ /* avoid parent chain recursion */
+ rbd_img_schedule(child_img_req, 0);
return 0;
}
-static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
+static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
{
struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
int ret;
- if (obj_req->result == -ENOENT &&
- rbd_dev->parent_overlap && !obj_req->tried_parent) {
- /* reverse map this object extent onto the parent */
- ret = rbd_obj_calc_img_extents(obj_req, false);
+again:
+ switch (obj_req->read_state) {
+ case RBD_OBJ_READ_START:
+ rbd_assert(!*result);
+
+ if (!rbd_obj_may_exist(obj_req)) {
+ *result = -ENOENT;
+ obj_req->read_state = RBD_OBJ_READ_OBJECT;
+ goto again;
+ }
+
+ ret = rbd_obj_read_object(obj_req);
if (ret) {
- obj_req->result = ret;
+ *result = ret;
return true;
}
-
- if (obj_req->num_img_extents) {
- obj_req->tried_parent = true;
- ret = rbd_obj_read_from_parent(obj_req);
+ obj_req->read_state = RBD_OBJ_READ_OBJECT;
+ return false;
+ case RBD_OBJ_READ_OBJECT:
+ if (*result == -ENOENT && rbd_dev->parent_overlap) {
+ /* reverse map this object extent onto the parent */
+ ret = rbd_obj_calc_img_extents(obj_req, false);
if (ret) {
- obj_req->result = ret;
+ *result = ret;
return true;
}
- return false;
+ if (obj_req->num_img_extents) {
+ ret = rbd_obj_read_from_parent(obj_req);
+ if (ret) {
+ *result = ret;
+ return true;
+ }
+ obj_req->read_state = RBD_OBJ_READ_PARENT;
+ return false;
+ }
+ }
+
+ /*
+ * -ENOENT means a hole in the image -- zero-fill the entire
+ * length of the request. A short read also implies zero-fill
+ * to the end of the request.
+ */
+ if (*result == -ENOENT) {
+ rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
+ *result = 0;
+ } else if (*result >= 0) {
+ if (*result < obj_req->ex.oe_len)
+ rbd_obj_zero_range(obj_req, *result,
+ obj_req->ex.oe_len - *result);
+ else
+ rbd_assert(*result == obj_req->ex.oe_len);
+ *result = 0;
}
+ return true;
+ case RBD_OBJ_READ_PARENT:
+ return true;
+ default:
+ BUG();
}
+}
- /*
- * -ENOENT means a hole in the image -- zero-fill the entire
- * length of the request. A short read also implies zero-fill
- * to the end of the request. In both cases we update xferred
- * count to indicate the whole request was satisfied.
- */
- if (obj_req->result == -ENOENT ||
- (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) {
- rbd_assert(!obj_req->xferred || !obj_req->result);
- rbd_obj_zero_range(obj_req, obj_req->xferred,
- obj_req->ex.oe_len - obj_req->xferred);
- obj_req->result = 0;
- obj_req->xferred = obj_req->ex.oe_len;
+static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
+{
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+
+ if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
+ obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
+
+ if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
+ (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
+ dout("%s %p noop for nonexistent\n", __func__, obj_req);
+ return true;
}
- return true;
+ return false;
+}
+
+/*
+ * Return:
+ * 0 - object map update sent
+ * 1 - object map update isn't needed
+ * <0 - error
+ */
+static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
+{
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+ u8 new_state;
+
+ if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
+ return 1;
+
+ if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
+ new_state = OBJECT_PENDING;
+ else
+ new_state = OBJECT_EXISTS;
+
+ return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
+}
+
+static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
+{
+ struct ceph_osd_request *osd_req;
+ int num_ops = count_write_ops(obj_req);
+ int which = 0;
+ int ret;
+
+ if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
+ num_ops++; /* stat */
+
+ osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
+ if (IS_ERR(osd_req))
+ return PTR_ERR(osd_req);
+
+ if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
+ ret = rbd_osd_setup_stat(osd_req, which++);
+ if (ret)
+ return ret;
+ }
+
+ rbd_osd_setup_write_ops(osd_req, which);
+ rbd_osd_format_write(osd_req);
+
+ ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
+ if (ret)
+ return ret;
+
+ rbd_osd_submit(osd_req);
+ return 0;
}
/*
@@ -2463,123 +3133,67 @@ static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
#define MODS_ONLY U32_MAX
-static int rbd_obj_issue_copyup_empty_snapc(struct rbd_obj_request *obj_req,
- u32 bytes)
+static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
+ u32 bytes)
{
+ struct ceph_osd_request *osd_req;
int ret;
dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
- rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
rbd_assert(bytes > 0 && bytes != MODS_ONLY);
- rbd_osd_req_destroy(obj_req->osd_req);
- obj_req->osd_req = __rbd_osd_req_create(obj_req, &rbd_empty_snapc, 1);
- if (!obj_req->osd_req)
- return -ENOMEM;
+ osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
+ if (IS_ERR(osd_req))
+ return PTR_ERR(osd_req);
- ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup");
+ ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
if (ret)
return ret;
- osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
- obj_req->copyup_bvecs,
- obj_req->copyup_bvec_count,
- bytes);
- rbd_osd_req_format_write(obj_req);
+ rbd_osd_format_write(osd_req);
- ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
+ ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
if (ret)
return ret;
- rbd_obj_request_submit(obj_req);
+ rbd_osd_submit(osd_req);
return 0;
}
-static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes)
+static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
+ u32 bytes)
{
- struct rbd_img_request *img_req = obj_req->img_request;
- unsigned int num_osd_ops = (bytes != MODS_ONLY);
- unsigned int which = 0;
+ struct ceph_osd_request *osd_req;
+ int num_ops = count_write_ops(obj_req);
+ int which = 0;
int ret;
dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
- rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT ||
- obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_CALL);
- rbd_osd_req_destroy(obj_req->osd_req);
- switch (img_req->op_type) {
- case OBJ_OP_WRITE:
- num_osd_ops += count_write_ops(obj_req);
- break;
- case OBJ_OP_ZEROOUT:
- num_osd_ops += count_zeroout_ops(obj_req);
- break;
- default:
- BUG();
- }
+ if (bytes != MODS_ONLY)
+ num_ops++; /* copyup */
- obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
- if (!obj_req->osd_req)
- return -ENOMEM;
+ osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
+ if (IS_ERR(osd_req))
+ return PTR_ERR(osd_req);
if (bytes != MODS_ONLY) {
- ret = osd_req_op_cls_init(obj_req->osd_req, which, "rbd",
- "copyup");
+ ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
if (ret)
return ret;
-
- osd_req_op_cls_request_data_bvecs(obj_req->osd_req, which++,
- obj_req->copyup_bvecs,
- obj_req->copyup_bvec_count,
- bytes);
}
- switch (img_req->op_type) {
- case OBJ_OP_WRITE:
- __rbd_obj_setup_write(obj_req, which);
- break;
- case OBJ_OP_ZEROOUT:
- __rbd_obj_setup_zeroout(obj_req, which);
- break;
- default:
- BUG();
- }
+ rbd_osd_setup_write_ops(osd_req, which);
+ rbd_osd_format_write(osd_req);
- ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
+ ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
if (ret)
return ret;
- rbd_obj_request_submit(obj_req);
+ rbd_osd_submit(osd_req);
return 0;
}
-static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
-{
- /*
- * Only send non-zero copyup data to save some I/O and network
- * bandwidth -- zero copyup data is equivalent to the object not
- * existing.
- */
- if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
- dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
- bytes = 0;
- }
-
- if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
- /*
- * Send a copyup request with an empty snapshot context to
- * deep-copyup the object through all existing snapshots.
- * A second request with the current snapshot context will be
- * sent for the actual modification.
- */
- obj_req->write_state = RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC;
- return rbd_obj_issue_copyup_empty_snapc(obj_req, bytes);
- }
-
- obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
- return rbd_obj_issue_copyup_ops(obj_req, bytes);
-}
-
static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
{
u32 i;
@@ -2608,7 +3222,12 @@ static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
return 0;
}
-static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
+/*
+ * The target object doesn't exist. Read the data for the entire
+ * target object up to the overlap point (if any) from the parent,
+ * so we can use it for a copyup.
+ */
+static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
{
struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
int ret;
@@ -2623,178 +3242,492 @@ static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
* request -- pass MODS_ONLY since the copyup isn't needed
* anymore.
*/
- obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
- return rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY);
+ return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
}
ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
if (ret)
return ret;
- obj_req->write_state = RBD_OBJ_WRITE_READ_FROM_PARENT;
return rbd_obj_read_from_parent(obj_req);
}
-static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
+static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
{
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+ struct ceph_snap_context *snapc = obj_req->img_request->snapc;
+ u8 new_state;
+ u32 i;
int ret;
- switch (obj_req->write_state) {
- case RBD_OBJ_WRITE_GUARD:
- rbd_assert(!obj_req->xferred);
- if (obj_req->result == -ENOENT) {
- /*
- * The target object doesn't exist. Read the data for
- * the entire target object up to the overlap point (if
- * any) from the parent, so we can use it for a copyup.
- */
- ret = rbd_obj_handle_write_guard(obj_req);
- if (ret) {
- obj_req->result = ret;
- return true;
- }
- return false;
+ rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
+
+ if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
+ return;
+
+ if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
+ return;
+
+ for (i = 0; i < snapc->num_snaps; i++) {
+ if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
+ i + 1 < snapc->num_snaps)
+ new_state = OBJECT_EXISTS_CLEAN;
+ else
+ new_state = OBJECT_EXISTS;
+
+ ret = rbd_object_map_update(obj_req, snapc->snaps[i],
+ new_state, NULL);
+ if (ret < 0) {
+ obj_req->pending.result = ret;
+ return;
}
- /* fall through */
- case RBD_OBJ_WRITE_FLAT:
- case RBD_OBJ_WRITE_COPYUP_OPS:
- if (!obj_req->result)
- /*
- * There is no such thing as a successful short
- * write -- indicate the whole request was satisfied.
- */
- obj_req->xferred = obj_req->ex.oe_len;
- return true;
- case RBD_OBJ_WRITE_READ_FROM_PARENT:
- if (obj_req->result)
- return true;
- rbd_assert(obj_req->xferred);
- ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
+ rbd_assert(!ret);
+ obj_req->pending.num_pending++;
+ }
+}
+
+static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
+{
+ u32 bytes = rbd_obj_img_extents_bytes(obj_req);
+ int ret;
+
+ rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
+
+ /*
+ * Only send non-zero copyup data to save some I/O and network
+ * bandwidth -- zero copyup data is equivalent to the object not
+ * existing.
+ */
+ if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
+ bytes = 0;
+
+ if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
+ /*
+ * Send a copyup request with an empty snapshot context to
+ * deep-copyup the object through all existing snapshots.
+ * A second request with the current snapshot context will be
+ * sent for the actual modification.
+ */
+ ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
+ if (ret) {
+ obj_req->pending.result = ret;
+ return;
+ }
+
+ obj_req->pending.num_pending++;
+ bytes = MODS_ONLY;
+ }
+
+ ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
+ if (ret) {
+ obj_req->pending.result = ret;
+ return;
+ }
+
+ obj_req->pending.num_pending++;
+}
+
+static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
+{
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+ int ret;
+
+again:
+ switch (obj_req->copyup_state) {
+ case RBD_OBJ_COPYUP_START:
+ rbd_assert(!*result);
+
+ ret = rbd_obj_copyup_read_parent(obj_req);
if (ret) {
- obj_req->result = ret;
- obj_req->xferred = 0;
+ *result = ret;
return true;
}
+ if (obj_req->num_img_extents)
+ obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
+ else
+ obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
return false;
- case RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC:
- if (obj_req->result)
+ case RBD_OBJ_COPYUP_READ_PARENT:
+ if (*result)
return true;
- obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
- ret = rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY);
- if (ret) {
- obj_req->result = ret;
+ if (is_zero_bvecs(obj_req->copyup_bvecs,
+ rbd_obj_img_extents_bytes(obj_req))) {
+ dout("%s %p detected zeros\n", __func__, obj_req);
+ obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
+ }
+
+ rbd_obj_copyup_object_maps(obj_req);
+ if (!obj_req->pending.num_pending) {
+ *result = obj_req->pending.result;
+ obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
+ goto again;
+ }
+ obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
+ return false;
+ case __RBD_OBJ_COPYUP_OBJECT_MAPS:
+ if (!pending_result_dec(&obj_req->pending, result))
+ return false;
+ /* fall through */
+ case RBD_OBJ_COPYUP_OBJECT_MAPS:
+ if (*result) {
+ rbd_warn(rbd_dev, "snap object map update failed: %d",
+ *result);
return true;
}
+
+ rbd_obj_copyup_write_object(obj_req);
+ if (!obj_req->pending.num_pending) {
+ *result = obj_req->pending.result;
+ obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
+ goto again;
+ }
+ obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
return false;
+ case __RBD_OBJ_COPYUP_WRITE_OBJECT:
+ if (!pending_result_dec(&obj_req->pending, result))
+ return false;
+ /* fall through */
+ case RBD_OBJ_COPYUP_WRITE_OBJECT:
+ return true;
default:
BUG();
}
}
/*
- * Returns true if @obj_req is completed, or false otherwise.
+ * Return:
+ * 0 - object map update sent
+ * 1 - object map update isn't needed
+ * <0 - error
*/
-static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
+static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
{
- switch (obj_req->img_request->op_type) {
- case OBJ_OP_READ:
- return rbd_obj_handle_read(obj_req);
- case OBJ_OP_WRITE:
- return rbd_obj_handle_write(obj_req);
- case OBJ_OP_DISCARD:
- case OBJ_OP_ZEROOUT:
- if (rbd_obj_handle_write(obj_req)) {
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+ u8 current_state = OBJECT_PENDING;
+
+ if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
+ return 1;
+
+ if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
+ return 1;
+
+ return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
+ &current_state);
+}
+
+static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
+{
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+ int ret;
+
+again:
+ switch (obj_req->write_state) {
+ case RBD_OBJ_WRITE_START:
+ rbd_assert(!*result);
+
+ if (rbd_obj_write_is_noop(obj_req))
+ return true;
+
+ ret = rbd_obj_write_pre_object_map(obj_req);
+ if (ret < 0) {
+ *result = ret;
+ return true;
+ }
+ obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
+ if (ret > 0)
+ goto again;
+ return false;
+ case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
+ if (*result) {
+ rbd_warn(rbd_dev, "pre object map update failed: %d",
+ *result);
+ return true;
+ }
+ ret = rbd_obj_write_object(obj_req);
+ if (ret) {
+ *result = ret;
+ return true;
+ }
+ obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
+ return false;
+ case RBD_OBJ_WRITE_OBJECT:
+ if (*result == -ENOENT) {
+ if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
+ *result = 0;
+ obj_req->copyup_state = RBD_OBJ_COPYUP_START;
+ obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
+ goto again;
+ }
/*
- * Hide -ENOENT from delete/truncate/zero -- discarding
- * a non-existent object is not a problem.
+ * On a non-existent object:
+ * delete - -ENOENT, truncate/zero - 0
*/
- if (obj_req->result == -ENOENT) {
- obj_req->result = 0;
- obj_req->xferred = obj_req->ex.oe_len;
- }
+ if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
+ *result = 0;
+ }
+ if (*result)
+ return true;
+
+ obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
+ goto again;
+ case __RBD_OBJ_WRITE_COPYUP:
+ if (!rbd_obj_advance_copyup(obj_req, result))
+ return false;
+ /* fall through */
+ case RBD_OBJ_WRITE_COPYUP:
+ if (*result) {
+ rbd_warn(rbd_dev, "copyup failed: %d", *result);
+ return true;
+ }
+ ret = rbd_obj_write_post_object_map(obj_req);
+ if (ret < 0) {
+ *result = ret;
return true;
}
+ obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
+ if (ret > 0)
+ goto again;
return false;
+ case RBD_OBJ_WRITE_POST_OBJECT_MAP:
+ if (*result)
+ rbd_warn(rbd_dev, "post object map update failed: %d",
+ *result);
+ return true;
default:
BUG();
}
}
-static void rbd_obj_end_request(struct rbd_obj_request *obj_req)
+/*
+ * Return true if @obj_req is completed.
+ */
+static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
+ int *result)
{
struct rbd_img_request *img_req = obj_req->img_request;
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
+ bool done;
- rbd_assert((!obj_req->result &&
- obj_req->xferred == obj_req->ex.oe_len) ||
- (obj_req->result < 0 && !obj_req->xferred));
- if (!obj_req->result) {
- img_req->xferred += obj_req->xferred;
- return;
- }
+ mutex_lock(&obj_req->state_mutex);
+ if (!rbd_img_is_write(img_req))
+ done = rbd_obj_advance_read(obj_req, result);
+ else
+ done = rbd_obj_advance_write(obj_req, result);
+ mutex_unlock(&obj_req->state_mutex);
- rbd_warn(img_req->rbd_dev,
- "%s at objno %llu %llu~%llu result %d xferred %llu",
- obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
- obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result,
- obj_req->xferred);
- if (!img_req->result) {
- img_req->result = obj_req->result;
- img_req->xferred = 0;
+ if (done && *result) {
+ rbd_assert(*result < 0);
+ rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
+ obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
+ obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
}
+ return done;
}
-static void rbd_img_end_child_request(struct rbd_img_request *img_req)
+/*
+ * This is open-coded in rbd_img_handle_request() to avoid parent chain
+ * recursion.
+ */
+static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
{
- struct rbd_obj_request *obj_req = img_req->obj_request;
+ if (__rbd_obj_handle_request(obj_req, &result))
+ rbd_img_handle_request(obj_req->img_request, result);
+}
- rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
- rbd_assert((!img_req->result &&
- img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) ||
- (img_req->result < 0 && !img_req->xferred));
+static bool need_exclusive_lock(struct rbd_img_request *img_req)
+{
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
- obj_req->result = img_req->result;
- obj_req->xferred = img_req->xferred;
- rbd_img_request_put(img_req);
+ if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
+ return false;
+
+ if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
+ return false;
+
+ rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
+ if (rbd_dev->opts->lock_on_read ||
+ (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
+ return true;
+
+ return rbd_img_is_write(img_req);
}
-static void rbd_img_end_request(struct rbd_img_request *img_req)
+static bool rbd_lock_add_request(struct rbd_img_request *img_req)
{
- rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
- rbd_assert((!img_req->result &&
- img_req->xferred == blk_rq_bytes(img_req->rq)) ||
- (img_req->result < 0 && !img_req->xferred));
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
+ bool locked;
+
+ lockdep_assert_held(&rbd_dev->lock_rwsem);
+ locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
+ spin_lock(&rbd_dev->lock_lists_lock);
+ rbd_assert(list_empty(&img_req->lock_item));
+ if (!locked)
+ list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
+ else
+ list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
+ spin_unlock(&rbd_dev->lock_lists_lock);
+ return locked;
+}
+
+static void rbd_lock_del_request(struct rbd_img_request *img_req)
+{
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
+ bool need_wakeup;
- blk_mq_end_request(img_req->rq,
- errno_to_blk_status(img_req->result));
- rbd_img_request_put(img_req);
+ lockdep_assert_held(&rbd_dev->lock_rwsem);
+ spin_lock(&rbd_dev->lock_lists_lock);
+ rbd_assert(!list_empty(&img_req->lock_item));
+ list_del_init(&img_req->lock_item);
+ need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
+ list_empty(&rbd_dev->running_list));
+ spin_unlock(&rbd_dev->lock_lists_lock);
+ if (need_wakeup)
+ complete(&rbd_dev->releasing_wait);
}
-static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
+static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
{
- struct rbd_img_request *img_req;
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
+
+ if (!need_exclusive_lock(img_req))
+ return 1;
+
+ if (rbd_lock_add_request(img_req))
+ return 1;
+
+ if (rbd_dev->opts->exclusive) {
+ WARN_ON(1); /* lock got released? */
+ return -EROFS;
+ }
+
+ /*
+ * Note the use of mod_delayed_work() in rbd_acquire_lock()
+ * and cancel_delayed_work() in wake_lock_waiters().
+ */
+ dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
+ queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
+ return 0;
+}
+
+static void rbd_img_object_requests(struct rbd_img_request *img_req)
+{
+ struct rbd_obj_request *obj_req;
+
+ rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
+
+ for_each_obj_request(img_req, obj_req) {
+ int result = 0;
+
+ if (__rbd_obj_handle_request(obj_req, &result)) {
+ if (result) {
+ img_req->pending.result = result;
+ return;
+ }
+ } else {
+ img_req->pending.num_pending++;
+ }
+ }
+}
+
+static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
+{
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
+ int ret;
again:
- if (!__rbd_obj_handle_request(obj_req))
- return;
+ switch (img_req->state) {
+ case RBD_IMG_START:
+ rbd_assert(!*result);
- img_req = obj_req->img_request;
- spin_lock(&img_req->completion_lock);
- rbd_obj_end_request(obj_req);
- rbd_assert(img_req->pending_count);
- if (--img_req->pending_count) {
- spin_unlock(&img_req->completion_lock);
- return;
+ ret = rbd_img_exclusive_lock(img_req);
+ if (ret < 0) {
+ *result = ret;
+ return true;
+ }
+ img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
+ if (ret > 0)
+ goto again;
+ return false;
+ case RBD_IMG_EXCLUSIVE_LOCK:
+ if (*result)
+ return true;
+
+ rbd_assert(!need_exclusive_lock(img_req) ||
+ __rbd_is_lock_owner(rbd_dev));
+
+ rbd_img_object_requests(img_req);
+ if (!img_req->pending.num_pending) {
+ *result = img_req->pending.result;
+ img_req->state = RBD_IMG_OBJECT_REQUESTS;
+ goto again;
+ }
+ img_req->state = __RBD_IMG_OBJECT_REQUESTS;
+ return false;
+ case __RBD_IMG_OBJECT_REQUESTS:
+ if (!pending_result_dec(&img_req->pending, result))
+ return false;
+ /* fall through */
+ case RBD_IMG_OBJECT_REQUESTS:
+ return true;
+ default:
+ BUG();
+ }
+}
+
+/*
+ * Return true if @img_req is completed.
+ */
+static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
+ int *result)
+{
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
+ bool done;
+
+ if (need_exclusive_lock(img_req)) {
+ down_read(&rbd_dev->lock_rwsem);
+ mutex_lock(&img_req->state_mutex);
+ done = rbd_img_advance(img_req, result);
+ if (done)
+ rbd_lock_del_request(img_req);
+ mutex_unlock(&img_req->state_mutex);
+ up_read(&rbd_dev->lock_rwsem);
+ } else {
+ mutex_lock(&img_req->state_mutex);
+ done = rbd_img_advance(img_req, result);
+ mutex_unlock(&img_req->state_mutex);
+ }
+
+ if (done && *result) {
+ rbd_assert(*result < 0);
+ rbd_warn(rbd_dev, "%s%s result %d",
+ test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
+ obj_op_name(img_req->op_type), *result);
}
+ return done;
+}
+
+static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
+{
+again:
+ if (!__rbd_img_handle_request(img_req, &result))
+ return;
- spin_unlock(&img_req->completion_lock);
if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
- obj_req = img_req->obj_request;
- rbd_img_end_child_request(img_req);
- goto again;
+ struct rbd_obj_request *obj_req = img_req->obj_request;
+
+ rbd_img_request_put(img_req);
+ if (__rbd_obj_handle_request(obj_req, &result)) {
+ img_req = obj_req->img_request;
+ goto again;
+ }
+ } else {
+ struct request *rq = img_req->rq;
+
+ rbd_img_request_put(img_req);
+ blk_mq_end_request(rq, errno_to_blk_status(result));
}
- rbd_img_end_request(img_req);
}
static const struct rbd_client_id rbd_empty_cid;
@@ -2839,6 +3772,7 @@ static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
{
struct rbd_client_id cid = rbd_get_cid(rbd_dev);
+ rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
strcpy(rbd_dev->lock_cookie, cookie);
rbd_set_owner_cid(rbd_dev, &cid);
queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
@@ -2863,7 +3797,6 @@ static int rbd_lock(struct rbd_device *rbd_dev)
if (ret)
return ret;
- rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
__rbd_lock(rbd_dev, cookie);
return 0;
}
@@ -2882,7 +3815,7 @@ static void rbd_unlock(struct rbd_device *rbd_dev)
ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
RBD_LOCK_NAME, rbd_dev->lock_cookie);
if (ret && ret != -ENOENT)
- rbd_warn(rbd_dev, "failed to unlock: %d", ret);
+ rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
/* treat errors as the image is unlocked */
rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
@@ -3009,15 +3942,34 @@ e_inval:
goto out;
}
-static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
+/*
+ * Either image request state machine(s) or rbd_add_acquire_lock()
+ * (i.e. "rbd map").
+ */
+static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
{
- dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
+ struct rbd_img_request *img_req;
+
+ dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
+ lockdep_assert_held_write(&rbd_dev->lock_rwsem);
cancel_delayed_work(&rbd_dev->lock_dwork);
- if (wake_all)
- wake_up_all(&rbd_dev->lock_waitq);
- else
- wake_up(&rbd_dev->lock_waitq);
+ if (!completion_done(&rbd_dev->acquire_wait)) {
+ rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
+ list_empty(&rbd_dev->running_list));
+ rbd_dev->acquire_err = result;
+ complete_all(&rbd_dev->acquire_wait);
+ return;
+ }
+
+ list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
+ mutex_lock(&img_req->state_mutex);
+ rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
+ rbd_img_schedule(img_req, result);
+ mutex_unlock(&img_req->state_mutex);
+ }
+
+ list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
}
static int get_lock_owner_info(struct rbd_device *rbd_dev,
@@ -3132,13 +4084,10 @@ static int rbd_try_lock(struct rbd_device *rbd_dev)
goto again;
ret = find_watcher(rbd_dev, lockers);
- if (ret) {
- if (ret > 0)
- ret = 0; /* have to request lock */
- goto out;
- }
+ if (ret)
+ goto out; /* request lock or error */
- rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
+ rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
ENTITY_NAME(lockers[0].id.name));
ret = ceph_monc_blacklist_add(&client->monc,
@@ -3165,53 +4114,90 @@ out:
return ret;
}
+static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
+{
+ int ret;
+
+ if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
+ ret = rbd_object_map_open(rbd_dev);
+ if (ret)
+ return ret;
+ }
+
+ return 0;
+}
+
/*
- * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
+ * Return:
+ * 0 - lock acquired
+ * 1 - caller should call rbd_request_lock()
+ * <0 - error
*/
-static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
- int *pret)
+static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
{
- enum rbd_lock_state lock_state;
+ int ret;
down_read(&rbd_dev->lock_rwsem);
dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
rbd_dev->lock_state);
if (__rbd_is_lock_owner(rbd_dev)) {
- lock_state = rbd_dev->lock_state;
up_read(&rbd_dev->lock_rwsem);
- return lock_state;
+ return 0;
}
up_read(&rbd_dev->lock_rwsem);
down_write(&rbd_dev->lock_rwsem);
dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
rbd_dev->lock_state);
- if (!__rbd_is_lock_owner(rbd_dev)) {
- *pret = rbd_try_lock(rbd_dev);
- if (*pret)
- rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
+ if (__rbd_is_lock_owner(rbd_dev)) {
+ up_write(&rbd_dev->lock_rwsem);
+ return 0;
+ }
+
+ ret = rbd_try_lock(rbd_dev);
+ if (ret < 0) {
+ rbd_warn(rbd_dev, "failed to lock header: %d", ret);
+ if (ret == -EBLACKLISTED)
+ goto out;
+
+ ret = 1; /* request lock anyway */
+ }
+ if (ret > 0) {
+ up_write(&rbd_dev->lock_rwsem);
+ return ret;
+ }
+
+ rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
+ rbd_assert(list_empty(&rbd_dev->running_list));
+
+ ret = rbd_post_acquire_action(rbd_dev);
+ if (ret) {
+ rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
+ /*
+ * Can't stay in RBD_LOCK_STATE_LOCKED because
+ * rbd_lock_add_request() would let the request through,
+ * assuming that e.g. object map is locked and loaded.
+ */
+ rbd_unlock(rbd_dev);
}
- lock_state = rbd_dev->lock_state;
+out:
+ wake_lock_waiters(rbd_dev, ret);
up_write(&rbd_dev->lock_rwsem);
- return lock_state;
+ return ret;
}
static void rbd_acquire_lock(struct work_struct *work)
{
struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
struct rbd_device, lock_dwork);
- enum rbd_lock_state lock_state;
- int ret = 0;
+ int ret;
dout("%s rbd_dev %p\n", __func__, rbd_dev);
again:
- lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
- if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
- if (lock_state == RBD_LOCK_STATE_LOCKED)
- wake_requests(rbd_dev, true);
- dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
- rbd_dev, lock_state, ret);
+ ret = rbd_try_acquire_lock(rbd_dev);
+ if (ret <= 0) {
+ dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
return;
}
@@ -3220,16 +4206,9 @@ again:
goto again; /* treat this as a dead client */
} else if (ret == -EROFS) {
rbd_warn(rbd_dev, "peer will not release lock");
- /*
- * If this is rbd_add_acquire_lock(), we want to fail
- * immediately -- reuse BLACKLISTED flag. Otherwise we
- * want to block.
- */
- if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
- set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
- /* wake "rbd map --exclusive" process */
- wake_requests(rbd_dev, false);
- }
+ down_write(&rbd_dev->lock_rwsem);
+ wake_lock_waiters(rbd_dev, ret);
+ up_write(&rbd_dev->lock_rwsem);
} else if (ret < 0) {
rbd_warn(rbd_dev, "error requesting lock: %d", ret);
mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
@@ -3246,43 +4225,67 @@ again:
}
}
-/*
- * lock_rwsem must be held for write
- */
-static bool rbd_release_lock(struct rbd_device *rbd_dev)
+static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
{
- dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
- rbd_dev->lock_state);
+ bool need_wait;
+
+ dout("%s rbd_dev %p\n", __func__, rbd_dev);
+ lockdep_assert_held_write(&rbd_dev->lock_rwsem);
+
if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
return false;
- rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
- downgrade_write(&rbd_dev->lock_rwsem);
/*
* Ensure that all in-flight IO is flushed.
- *
- * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
- * may be shared with other devices.
*/
- ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
+ rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
+ rbd_assert(!completion_done(&rbd_dev->releasing_wait));
+ need_wait = !list_empty(&rbd_dev->running_list);
+ downgrade_write(&rbd_dev->lock_rwsem);
+ if (need_wait)
+ wait_for_completion(&rbd_dev->releasing_wait);
up_read(&rbd_dev->lock_rwsem);
down_write(&rbd_dev->lock_rwsem);
- dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
- rbd_dev->lock_state);
if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
return false;
+ rbd_assert(list_empty(&rbd_dev->running_list));
+ return true;
+}
+
+static void rbd_pre_release_action(struct rbd_device *rbd_dev)
+{
+ if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
+ rbd_object_map_close(rbd_dev);
+}
+
+static void __rbd_release_lock(struct rbd_device *rbd_dev)
+{
+ rbd_assert(list_empty(&rbd_dev->running_list));
+
+ rbd_pre_release_action(rbd_dev);
rbd_unlock(rbd_dev);
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static void rbd_release_lock(struct rbd_device *rbd_dev)
+{
+ if (!rbd_quiesce_lock(rbd_dev))
+ return;
+
+ __rbd_release_lock(rbd_dev);
+
/*
* Give others a chance to grab the lock - we would re-acquire
- * almost immediately if we got new IO during ceph_osdc_sync()
- * otherwise. We need to ack our own notifications, so this
- * lock_dwork will be requeued from rbd_wait_state_locked()
- * after wake_requests() in rbd_handle_released_lock().
+ * almost immediately if we got new IO while draining the running
+ * list otherwise. We need to ack our own notifications, so this
+ * lock_dwork will be requeued from rbd_handle_released_lock() by
+ * way of maybe_kick_acquire().
*/
cancel_delayed_work(&rbd_dev->lock_dwork);
- return true;
}
static void rbd_release_lock_work(struct work_struct *work)
@@ -3295,6 +4298,23 @@ static void rbd_release_lock_work(struct work_struct *work)
up_write(&rbd_dev->lock_rwsem);
}
+static void maybe_kick_acquire(struct rbd_device *rbd_dev)
+{
+ bool have_requests;
+
+ dout("%s rbd_dev %p\n", __func__, rbd_dev);
+ if (__rbd_is_lock_owner(rbd_dev))
+ return;
+
+ spin_lock(&rbd_dev->lock_lists_lock);
+ have_requests = !list_empty(&rbd_dev->acquiring_list);
+ spin_unlock(&rbd_dev->lock_lists_lock);
+ if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
+ dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
+ mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
+ }
+}
+
static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
void **p)
{
@@ -3324,8 +4344,7 @@ static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
down_read(&rbd_dev->lock_rwsem);
}
- if (!__rbd_is_lock_owner(rbd_dev))
- wake_requests(rbd_dev, false);
+ maybe_kick_acquire(rbd_dev);
up_read(&rbd_dev->lock_rwsem);
}
@@ -3357,8 +4376,7 @@ static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
down_read(&rbd_dev->lock_rwsem);
}
- if (!__rbd_is_lock_owner(rbd_dev))
- wake_requests(rbd_dev, false);
+ maybe_kick_acquire(rbd_dev);
up_read(&rbd_dev->lock_rwsem);
}
@@ -3608,7 +4626,6 @@ static void cancel_tasks_sync(struct rbd_device *rbd_dev)
static void rbd_unregister_watch(struct rbd_device *rbd_dev)
{
- WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
cancel_tasks_sync(rbd_dev);
mutex_lock(&rbd_dev->watch_mutex);
@@ -3630,7 +4647,8 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
char cookie[32];
int ret;
- WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
+ if (!rbd_quiesce_lock(rbd_dev))
+ return;
format_lock_cookie(rbd_dev, cookie);
ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
@@ -3646,11 +4664,11 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
* Lock cookie cannot be updated on older OSDs, so do
* a manual release and queue an acquire.
*/
- if (rbd_release_lock(rbd_dev))
- queue_delayed_work(rbd_dev->task_wq,
- &rbd_dev->lock_dwork, 0);
+ __rbd_release_lock(rbd_dev);
+ queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
} else {
__rbd_lock(rbd_dev, cookie);
+ wake_lock_waiters(rbd_dev, 0);
}
}
@@ -3671,15 +4689,18 @@ static void rbd_reregister_watch(struct work_struct *work)
ret = __rbd_register_watch(rbd_dev);
if (ret) {
rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
- if (ret == -EBLACKLISTED || ret == -ENOENT) {
- set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
- wake_requests(rbd_dev, true);
- } else {
+ if (ret != -EBLACKLISTED && ret != -ENOENT) {
queue_delayed_work(rbd_dev->task_wq,
&rbd_dev->watch_dwork,
RBD_RETRY_DELAY);
+ mutex_unlock(&rbd_dev->watch_mutex);
+ return;
}
+
mutex_unlock(&rbd_dev->watch_mutex);
+ down_write(&rbd_dev->lock_rwsem);
+ wake_lock_waiters(rbd_dev, ret);
+ up_write(&rbd_dev->lock_rwsem);
return;
}
@@ -3742,7 +4763,7 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
CEPH_OSD_FLAG_READ, req_page, outbound_size,
- reply_page, &inbound_size);
+ &reply_page, &inbound_size);
if (!ret) {
memcpy(inbound, page_address(reply_page), inbound_size);
ret = inbound_size;
@@ -3754,54 +4775,6 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
return ret;
}
-/*
- * lock_rwsem must be held for read
- */
-static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire)
-{
- DEFINE_WAIT(wait);
- unsigned long timeout;
- int ret = 0;
-
- if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
- return -EBLACKLISTED;
-
- if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
- return 0;
-
- if (!may_acquire) {
- rbd_warn(rbd_dev, "exclusive lock required");
- return -EROFS;
- }
-
- do {
- /*
- * Note the use of mod_delayed_work() in rbd_acquire_lock()
- * and cancel_delayed_work() in wake_requests().
- */
- dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
- queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
- prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
- TASK_UNINTERRUPTIBLE);
- up_read(&rbd_dev->lock_rwsem);
- timeout = schedule_timeout(ceph_timeout_jiffies(
- rbd_dev->opts->lock_timeout));
- down_read(&rbd_dev->lock_rwsem);
- if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
- ret = -EBLACKLISTED;
- break;
- }
- if (!timeout) {
- rbd_warn(rbd_dev, "timed out waiting for lock");
- ret = -ETIMEDOUT;
- break;
- }
- } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
-
- finish_wait(&rbd_dev->lock_waitq, &wait);
- return ret;
-}
-
static void rbd_queue_workfn(struct work_struct *work)
{
struct request *rq = blk_mq_rq_from_pdu(work);
@@ -3812,7 +4785,6 @@ static void rbd_queue_workfn(struct work_struct *work)
u64 length = blk_rq_bytes(rq);
enum obj_operation_type op_type;
u64 mapping_size;
- bool must_be_locked;
int result;
switch (req_op(rq)) {
@@ -3886,21 +4858,10 @@ static void rbd_queue_workfn(struct work_struct *work)
goto err_rq;
}
- must_be_locked =
- (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
- (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
- if (must_be_locked) {
- down_read(&rbd_dev->lock_rwsem);
- result = rbd_wait_state_locked(rbd_dev,
- !rbd_dev->opts->exclusive);
- if (result)
- goto err_unlock;
- }
-
img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
if (!img_request) {
result = -ENOMEM;
- goto err_unlock;
+ goto err_rq;
}
img_request->rq = rq;
snapc = NULL; /* img_request consumes a ref */
@@ -3910,19 +4871,14 @@ static void rbd_queue_workfn(struct work_struct *work)
else
result = rbd_img_fill_from_bio(img_request, offset, length,
rq->bio);
- if (result || !img_request->pending_count)
+ if (result)
goto err_img_request;
- rbd_img_request_submit(img_request);
- if (must_be_locked)
- up_read(&rbd_dev->lock_rwsem);
+ rbd_img_handle_request(img_request, 0);
return;
err_img_request:
rbd_img_request_put(img_request);
-err_unlock:
- if (must_be_locked)
- up_read(&rbd_dev->lock_rwsem);
err_rq:
if (result)
rbd_warn(rbd_dev, "%s %llx at %llx result %d",
@@ -4589,7 +5545,13 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
- init_waitqueue_head(&rbd_dev->lock_waitq);
+ spin_lock_init(&rbd_dev->lock_lists_lock);
+ INIT_LIST_HEAD(&rbd_dev->acquiring_list);
+ INIT_LIST_HEAD(&rbd_dev->running_list);
+ init_completion(&rbd_dev->acquire_wait);
+ init_completion(&rbd_dev->releasing_wait);
+
+ spin_lock_init(&rbd_dev->object_map_lock);
rbd_dev->dev.bus = &rbd_bus_type;
rbd_dev->dev.type = &rbd_device_type;
@@ -4772,6 +5734,32 @@ static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
&rbd_dev->header.features);
}
+/*
+ * These are generic image flags, but since they are used only for
+ * object map, store them in rbd_dev->object_map_flags.
+ *
+ * For the same reason, this function is called only on object map
+ * (re)load and not on header refresh.
+ */
+static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
+{
+ __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
+ __le64 flags;
+ int ret;
+
+ ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
+ &rbd_dev->header_oloc, "get_flags",
+ &snapid, sizeof(snapid),
+ &flags, sizeof(flags));
+ if (ret < 0)
+ return ret;
+ if (ret < sizeof(flags))
+ return -EBADMSG;
+
+ rbd_dev->object_map_flags = le64_to_cpu(flags);
+ return 0;
+}
+
struct parent_image_info {
u64 pool_id;
const char *pool_ns;
@@ -4829,7 +5817,7 @@ static int __get_parent_info(struct rbd_device *rbd_dev,
ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
"rbd", "parent_get", CEPH_OSD_FLAG_READ,
- req_page, sizeof(u64), reply_page, &reply_len);
+ req_page, sizeof(u64), &reply_page, &reply_len);
if (ret)
return ret == -EOPNOTSUPP ? 1 : ret;
@@ -4841,7 +5829,7 @@ static int __get_parent_info(struct rbd_device *rbd_dev,
ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
"rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
- req_page, sizeof(u64), reply_page, &reply_len);
+ req_page, sizeof(u64), &reply_page, &reply_len);
if (ret)
return ret;
@@ -4872,7 +5860,7 @@ static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
"rbd", "get_parent", CEPH_OSD_FLAG_READ,
- req_page, sizeof(u64), reply_page, &reply_len);
+ req_page, sizeof(u64), &reply_page, &reply_len);
if (ret)
return ret;
@@ -5605,28 +6593,49 @@ static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
{
down_write(&rbd_dev->lock_rwsem);
if (__rbd_is_lock_owner(rbd_dev))
- rbd_unlock(rbd_dev);
+ __rbd_release_lock(rbd_dev);
up_write(&rbd_dev->lock_rwsem);
}
+/*
+ * If the wait is interrupted, an error is returned even if the lock
+ * was successfully acquired. rbd_dev_image_unlock() will release it
+ * if needed.
+ */
static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
{
- int ret;
+ long ret;
if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
+ if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
+ return 0;
+
rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
return -EINVAL;
}
- /* FIXME: "rbd map --exclusive" should be in interruptible */
- down_read(&rbd_dev->lock_rwsem);
- ret = rbd_wait_state_locked(rbd_dev, true);
- up_read(&rbd_dev->lock_rwsem);
+ if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
+ return 0;
+
+ rbd_assert(!rbd_is_lock_owner(rbd_dev));
+ queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
+ ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
+ ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
+ if (ret > 0)
+ ret = rbd_dev->acquire_err;
+ else if (!ret)
+ ret = -ETIMEDOUT;
+
if (ret) {
- rbd_warn(rbd_dev, "failed to acquire exclusive lock");
- return -EROFS;
+ rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
+ return ret;
}
+ /*
+ * The lock may have been released by now, unless automatic lock
+ * transitions are disabled.
+ */
+ rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
return 0;
}
@@ -5724,6 +6733,8 @@ static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
struct rbd_image_header *header;
rbd_dev_parent_put(rbd_dev);
+ rbd_object_map_free(rbd_dev);
+ rbd_dev_mapping_clear(rbd_dev);
/* Free dynamic fields from the header, then zero it out */
@@ -5824,7 +6835,6 @@ out_err:
static void rbd_dev_device_release(struct rbd_device *rbd_dev)
{
clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
- rbd_dev_mapping_clear(rbd_dev);
rbd_free_disk(rbd_dev);
if (!single_major)
unregister_blkdev(rbd_dev->major, rbd_dev->name);
@@ -5858,23 +6868,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
if (ret)
goto err_out_blkdev;
- ret = rbd_dev_mapping_set(rbd_dev);
- if (ret)
- goto err_out_disk;
-
set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
if (ret)
- goto err_out_mapping;
+ goto err_out_disk;
set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
up_write(&rbd_dev->header_rwsem);
return 0;
-err_out_mapping:
- rbd_dev_mapping_clear(rbd_dev);
err_out_disk:
rbd_free_disk(rbd_dev);
err_out_blkdev:
@@ -5975,6 +6979,17 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
goto err_out_probe;
}
+ ret = rbd_dev_mapping_set(rbd_dev);
+ if (ret)
+ goto err_out_probe;
+
+ if (rbd_dev->spec->snap_id != CEPH_NOSNAP &&
+ (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
+ ret = rbd_object_map_load(rbd_dev);
+ if (ret)
+ goto err_out_probe;
+ }
+
if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
ret = rbd_dev_v2_parent_info(rbd_dev);
if (ret)
@@ -6071,11 +7086,9 @@ static ssize_t do_rbd_add(struct bus_type *bus,
if (rc)
goto err_out_image_probe;
- if (rbd_dev->opts->exclusive) {
- rc = rbd_add_acquire_lock(rbd_dev);
- if (rc)
- goto err_out_device_setup;
- }
+ rc = rbd_add_acquire_lock(rbd_dev);
+ if (rc)
+ goto err_out_image_lock;
/* Everything's ready. Announce the disk to the world. */
@@ -6101,7 +7114,6 @@ out:
err_out_image_lock:
rbd_dev_image_unlock(rbd_dev);
-err_out_device_setup:
rbd_dev_device_release(rbd_dev);
err_out_image_probe:
rbd_dev_image_release(rbd_dev);
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index 62ff50d3e7a6..ac98ab6ccd3b 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -18,6 +18,7 @@
/* For format version 2, rbd image 'foo' consists of objects
* rbd_id.foo - id of image
* rbd_header.<id> - image metadata
+ * rbd_object_map.<id> - optional image object map
* rbd_data.<id>.0000000000000000
* rbd_data.<id>.0000000000000001
* ... - data
@@ -25,6 +26,7 @@
*/
#define RBD_HEADER_PREFIX "rbd_header."
+#define RBD_OBJECT_MAP_PREFIX "rbd_object_map."
#define RBD_ID_PREFIX "rbd_id."
#define RBD_V2_DATA_FORMAT "%s.%016llx"
@@ -39,6 +41,14 @@ enum rbd_notify_op {
RBD_NOTIFY_OP_HEADER_UPDATE = 3,
};
+#define OBJECT_NONEXISTENT 0
+#define OBJECT_EXISTS 1
+#define OBJECT_PENDING 2
+#define OBJECT_EXISTS_CLEAN 3
+
+#define RBD_FLAG_OBJECT_MAP_INVALID (1ULL << 0)
+#define RBD_FLAG_FAST_DIFF_INVALID (1ULL << 1)
+
/*
* For format version 1, rbd image 'foo' consists of objects
* foo.rbd - image metadata
diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig
index 7f7d92d6b024..cf235f6eacf9 100644
--- a/fs/ceph/Kconfig
+++ b/fs/ceph/Kconfig
@@ -36,3 +36,15 @@ config CEPH_FS_POSIX_ACL
groups beyond the owner/group/world scheme.
If you don't know what Access Control Lists are, say N
+
+config CEPH_FS_SECURITY_LABEL
+ bool "CephFS Security Labels"
+ depends on CEPH_FS && SECURITY
+ help
+ Security labels support alternative access control models
+ implemented by security modules like SELinux. This option
+ enables an extended attribute handler for file security
+ labels in the Ceph filesystem.
+
+ If you are not using a security module that requires using
+ extended attributes for file security labels, say N.
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 8a19c249036c..aa55f412a6e3 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -159,7 +159,7 @@ out:
}
int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
- struct ceph_acls_info *info)
+ struct ceph_acl_sec_ctx *as_ctx)
{
struct posix_acl *acl, *default_acl;
size_t val_size1 = 0, val_size2 = 0;
@@ -234,9 +234,9 @@ int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
kfree(tmp_buf);
- info->acl = acl;
- info->default_acl = default_acl;
- info->pagelist = pagelist;
+ as_ctx->acl = acl;
+ as_ctx->default_acl = default_acl;
+ as_ctx->pagelist = pagelist;
return 0;
out_err:
@@ -248,18 +248,10 @@ out_err:
return err;
}
-void ceph_init_inode_acls(struct inode* inode, struct ceph_acls_info *info)
+void ceph_init_inode_acls(struct inode *inode, struct ceph_acl_sec_ctx *as_ctx)
{
if (!inode)
return;
- ceph_set_cached_acl(inode, ACL_TYPE_ACCESS, info->acl);
- ceph_set_cached_acl(inode, ACL_TYPE_DEFAULT, info->default_acl);
-}
-
-void ceph_release_acls_info(struct ceph_acls_info *info)
-{
- posix_acl_release(info->acl);
- posix_acl_release(info->default_acl);
- if (info->pagelist)
- ceph_pagelist_release(info->pagelist);
+ ceph_set_cached_acl(inode, ACL_TYPE_ACCESS, as_ctx->acl);
+ ceph_set_cached_acl(inode, ACL_TYPE_DEFAULT, as_ctx->default_acl);
}
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index a47c541f8006..e078cc55b989 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -10,6 +10,7 @@
#include <linux/pagevec.h>
#include <linux/task_io_accounting_ops.h>
#include <linux/signal.h>
+#include <linux/iversion.h>
#include "super.h"
#include "mds_client.h"
@@ -1576,6 +1577,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
/* Update time before taking page lock */
file_update_time(vma->vm_file);
+ inode_inc_iversion_raw(inode);
do {
lock_page(page);
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 0176241eaea7..d98dcd976c80 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -8,6 +8,7 @@
#include <linux/vmalloc.h>
#include <linux/wait.h>
#include <linux/writeback.h>
+#include <linux/iversion.h>
#include "super.h"
#include "mds_client.h"
@@ -1138,8 +1139,9 @@ struct cap_msg_args {
u64 ino, cid, follows;
u64 flush_tid, oldest_flush_tid, size, max_size;
u64 xattr_version;
+ u64 change_attr;
struct ceph_buffer *xattr_buf;
- struct timespec64 atime, mtime, ctime;
+ struct timespec64 atime, mtime, ctime, btime;
int op, caps, wanted, dirty;
u32 seq, issue_seq, mseq, time_warp_seq;
u32 flags;
@@ -1160,7 +1162,6 @@ static int send_cap_msg(struct cap_msg_args *arg)
struct ceph_msg *msg;
void *p;
size_t extra_len;
- struct timespec64 zerotime = {0};
struct ceph_osd_client *osdc = &arg->session->s_mdsc->fsc->client->osdc;
dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
@@ -1245,15 +1246,10 @@ static int send_cap_msg(struct cap_msg_args *arg)
/* pool namespace (version 8) (mds always ignores this) */
ceph_encode_32(&p, 0);
- /*
- * btime and change_attr (version 9)
- *
- * We just zero these out for now, as the MDS ignores them unless
- * the requisite feature flags are set (which we don't do yet).
- */
- ceph_encode_timespec64(p, &zerotime);
+ /* btime and change_attr (version 9) */
+ ceph_encode_timespec64(p, &arg->btime);
p += sizeof(struct ceph_timespec);
- ceph_encode_64(&p, 0);
+ ceph_encode_64(&p, arg->change_attr);
/* Advisory flags (version 10) */
ceph_encode_32(&p, arg->flags);
@@ -1263,20 +1259,22 @@ static int send_cap_msg(struct cap_msg_args *arg)
}
/*
- * Queue cap releases when an inode is dropped from our cache. Since
- * inode is about to be destroyed, there is no need for i_ceph_lock.
+ * Queue cap releases when an inode is dropped from our cache.
*/
-void __ceph_remove_caps(struct inode *inode)
+void __ceph_remove_caps(struct ceph_inode_info *ci)
{
- struct ceph_inode_info *ci = ceph_inode(inode);
struct rb_node *p;
+ /* lock i_ceph_lock, because ceph_d_revalidate(..., LOOKUP_RCU)
+ * may call __ceph_caps_issued_mask() on a freeing inode. */
+ spin_lock(&ci->i_ceph_lock);
p = rb_first(&ci->i_caps);
while (p) {
struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
p = rb_next(p);
__ceph_remove_cap(cap, true);
}
+ spin_unlock(&ci->i_ceph_lock);
}
/*
@@ -1297,7 +1295,7 @@ void __ceph_remove_caps(struct inode *inode)
* caller should hold snap_rwsem (read), s_mutex.
*/
static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
- int op, bool sync, int used, int want, int retain,
+ int op, int flags, int used, int want, int retain,
int flushing, u64 flush_tid, u64 oldest_flush_tid)
__releases(cap->ci->i_ceph_lock)
{
@@ -1377,6 +1375,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
arg.mtime = inode->i_mtime;
arg.atime = inode->i_atime;
arg.ctime = inode->i_ctime;
+ arg.btime = ci->i_btime;
+ arg.change_attr = inode_peek_iversion_raw(inode);
arg.op = op;
arg.caps = cap->implemented;
@@ -1393,12 +1393,19 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
arg.mode = inode->i_mode;
arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
- if (list_empty(&ci->i_cap_snaps))
- arg.flags = CEPH_CLIENT_CAPS_NO_CAPSNAP;
- else
- arg.flags = CEPH_CLIENT_CAPS_PENDING_CAPSNAP;
- if (sync)
- arg.flags |= CEPH_CLIENT_CAPS_SYNC;
+ if (!(flags & CEPH_CLIENT_CAPS_PENDING_CAPSNAP) &&
+ !list_empty(&ci->i_cap_snaps)) {
+ struct ceph_cap_snap *capsnap;
+ list_for_each_entry_reverse(capsnap, &ci->i_cap_snaps, ci_item) {
+ if (capsnap->cap_flush.tid)
+ break;
+ if (capsnap->need_flush) {
+ flags |= CEPH_CLIENT_CAPS_PENDING_CAPSNAP;
+ break;
+ }
+ }
+ }
+ arg.flags = flags;
spin_unlock(&ci->i_ceph_lock);
@@ -1436,6 +1443,8 @@ static inline int __send_flush_snap(struct inode *inode,
arg.atime = capsnap->atime;
arg.mtime = capsnap->mtime;
arg.ctime = capsnap->ctime;
+ arg.btime = capsnap->btime;
+ arg.change_attr = capsnap->change_attr;
arg.op = CEPH_CAP_OP_FLUSHSNAP;
arg.caps = capsnap->issued;
@@ -1603,10 +1612,8 @@ retry:
}
// make sure flushsnap messages are sent in proper order.
- if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
+ if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
__kick_flushing_caps(mdsc, session, ci, 0);
- ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
- }
__ceph_flush_snaps(ci, session);
out:
@@ -2048,10 +2055,8 @@ ack:
if (cap == ci->i_auth_cap &&
(ci->i_ceph_flags &
(CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) {
- if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
+ if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
__kick_flushing_caps(mdsc, session, ci, 0);
- ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
- }
if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
__ceph_flush_snaps(ci, session);
@@ -2087,7 +2092,7 @@ ack:
sent++;
/* __send_cap drops i_ceph_lock */
- delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, false,
+ delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, 0,
cap_used, want, retain, flushing,
flush_tid, oldest_flush_tid);
goto retry; /* retake i_ceph_lock and restart our cap scan. */
@@ -2121,6 +2126,7 @@ static int try_flush_caps(struct inode *inode, u64 *ptid)
retry:
spin_lock(&ci->i_ceph_lock);
+retry_locked:
if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
spin_unlock(&ci->i_ceph_lock);
dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode);
@@ -2128,8 +2134,6 @@ retry:
}
if (ci->i_dirty_caps && ci->i_auth_cap) {
struct ceph_cap *cap = ci->i_auth_cap;
- int used = __ceph_caps_used(ci);
- int want = __ceph_caps_wanted(ci);
int delayed;
if (!session || session != cap->session) {
@@ -2145,13 +2149,25 @@ retry:
goto out;
}
+ if (ci->i_ceph_flags &
+ (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS)) {
+ if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
+ __kick_flushing_caps(mdsc, session, ci, 0);
+ if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
+ __ceph_flush_snaps(ci, session);
+ goto retry_locked;
+ }
+
flushing = __mark_caps_flushing(inode, session, true,
&flush_tid, &oldest_flush_tid);
/* __send_cap drops i_ceph_lock */
- delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, true,
- used, want, (cap->issued | cap->implemented),
- flushing, flush_tid, oldest_flush_tid);
+ delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
+ CEPH_CLIENT_CAPS_SYNC,
+ __ceph_caps_used(ci),
+ __ceph_caps_wanted(ci),
+ (cap->issued | cap->implemented),
+ flushing, flush_tid, oldest_flush_tid);
if (delayed) {
spin_lock(&ci->i_ceph_lock);
@@ -2320,6 +2336,16 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
struct ceph_cap_flush *cf;
int ret;
u64 first_tid = 0;
+ u64 last_snap_flush = 0;
+
+ ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
+
+ list_for_each_entry_reverse(cf, &ci->i_cap_flush_list, i_list) {
+ if (!cf->caps) {
+ last_snap_flush = cf->tid;
+ break;
+ }
+ }
list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) {
if (cf->tid < first_tid)
@@ -2338,10 +2364,13 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
dout("kick_flushing_caps %p cap %p tid %llu %s\n",
inode, cap, cf->tid, ceph_cap_string(cf->caps));
ci->i_ceph_flags |= CEPH_I_NODELAY;
+
ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
- false, __ceph_caps_used(ci),
+ (cf->tid < last_snap_flush ?
+ CEPH_CLIENT_CAPS_PENDING_CAPSNAP : 0),
+ __ceph_caps_used(ci),
__ceph_caps_wanted(ci),
- cap->issued | cap->implemented,
+ (cap->issued | cap->implemented),
cf->caps, cf->tid, oldest_flush_tid);
if (ret) {
pr_err("kick_flushing_caps: error sending "
@@ -2410,7 +2439,6 @@ void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
*/
if ((cap->issued & ci->i_flushing_caps) !=
ci->i_flushing_caps) {
- ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
/* encode_caps_cb() also will reset these sequence
* numbers. make sure sequence numbers in cap flush
* message match later reconnect message */
@@ -2450,7 +2478,6 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
continue;
}
if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
- ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
__kick_flushing_caps(mdsc, session, ci,
oldest_flush_tid);
}
@@ -2478,7 +2505,6 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
oldest_flush_tid = __get_oldest_flush_tid(mdsc);
spin_unlock(&mdsc->cap_dirty_lock);
- ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
__kick_flushing_caps(mdsc, session, ci, oldest_flush_tid);
spin_unlock(&ci->i_ceph_lock);
} else {
@@ -3040,8 +3066,10 @@ struct cap_extra_info {
bool dirstat_valid;
u64 nfiles;
u64 nsubdirs;
+ u64 change_attr;
/* currently issued */
int issued;
+ struct timespec64 btime;
};
/*
@@ -3123,11 +3151,14 @@ static void handle_cap_grant(struct inode *inode,
__check_cap_issue(ci, cap, newcaps);
+ inode_set_max_iversion_raw(inode, extra_info->change_attr);
+
if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
(extra_info->issued & CEPH_CAP_AUTH_EXCL) == 0) {
inode->i_mode = le32_to_cpu(grant->mode);
inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
+ ci->i_btime = extra_info->btime;
dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode,
from_kuid(&init_user_ns, inode->i_uid),
from_kgid(&init_user_ns, inode->i_gid));
@@ -3154,6 +3185,7 @@ static void handle_cap_grant(struct inode *inode,
ci->i_xattrs.blob = ceph_buffer_get(xattr_buf);
ci->i_xattrs.version = version;
ceph_forget_all_cached_acls(inode);
+ ceph_security_invalidate_secctx(inode);
}
}
@@ -3848,17 +3880,19 @@ void ceph_handle_caps(struct ceph_mds_session *session,
}
}
- if (msg_version >= 11) {
+ if (msg_version >= 9) {
struct ceph_timespec *btime;
- u64 change_attr;
- u32 flags;
- /* version >= 9 */
if (p + sizeof(*btime) > end)
goto bad;
btime = p;
+ ceph_decode_timespec64(&extra_info.btime, btime);
p += sizeof(*btime);
- ceph_decode_64_safe(&p, end, change_attr, bad);
+ ceph_decode_64_safe(&p, end, extra_info.change_attr, bad);
+ }
+
+ if (msg_version >= 11) {
+ u32 flags;
/* version >= 10 */
ceph_decode_32_safe(&p, end, flags, bad);
/* version >= 11 */
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 83cd41fa2b01..2eb88ed22993 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -52,7 +52,7 @@ static int mdsc_show(struct seq_file *s, void *p)
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_mds_request *req;
struct rb_node *rp;
- int pathlen;
+ int pathlen = 0;
u64 pathbase;
char *path;
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 0637149fb9f9..aab29f48c62d 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -825,7 +825,7 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_mds_request *req;
- struct ceph_acls_info acls = {};
+ struct ceph_acl_sec_ctx as_ctx = {};
int err;
if (ceph_snap(dir) != CEPH_NOSNAP)
@@ -836,7 +836,10 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
goto out;
}
- err = ceph_pre_init_acls(dir, &mode, &acls);
+ err = ceph_pre_init_acls(dir, &mode, &as_ctx);
+ if (err < 0)
+ goto out;
+ err = ceph_security_init_secctx(dentry, mode, &as_ctx);
if (err < 0)
goto out;
@@ -855,9 +858,9 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
req->r_args.mknod.rdev = cpu_to_le32(rdev);
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
- if (acls.pagelist) {
- req->r_pagelist = acls.pagelist;
- acls.pagelist = NULL;
+ if (as_ctx.pagelist) {
+ req->r_pagelist = as_ctx.pagelist;
+ as_ctx.pagelist = NULL;
}
err = ceph_mdsc_do_request(mdsc, dir, req);
if (!err && !req->r_reply_info.head->is_dentry)
@@ -865,10 +868,10 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
ceph_mdsc_put_request(req);
out:
if (!err)
- ceph_init_inode_acls(d_inode(dentry), &acls);
+ ceph_init_inode_acls(d_inode(dentry), &as_ctx);
else
d_drop(dentry);
- ceph_release_acls_info(&acls);
+ ceph_release_acl_sec_ctx(&as_ctx);
return err;
}
@@ -884,6 +887,7 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_mds_request *req;
+ struct ceph_acl_sec_ctx as_ctx = {};
int err;
if (ceph_snap(dir) != CEPH_NOSNAP)
@@ -894,6 +898,10 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
goto out;
}
+ err = ceph_security_init_secctx(dentry, S_IFLNK | 0777, &as_ctx);
+ if (err < 0)
+ goto out;
+
dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS);
if (IS_ERR(req)) {
@@ -919,6 +927,7 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
out:
if (err)
d_drop(dentry);
+ ceph_release_acl_sec_ctx(&as_ctx);
return err;
}
@@ -927,7 +936,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_mds_request *req;
- struct ceph_acls_info acls = {};
+ struct ceph_acl_sec_ctx as_ctx = {};
int err = -EROFS;
int op;
@@ -950,7 +959,10 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
}
mode |= S_IFDIR;
- err = ceph_pre_init_acls(dir, &mode, &acls);
+ err = ceph_pre_init_acls(dir, &mode, &as_ctx);
+ if (err < 0)
+ goto out;
+ err = ceph_security_init_secctx(dentry, mode, &as_ctx);
if (err < 0)
goto out;
@@ -967,9 +979,9 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
req->r_args.mkdir.mode = cpu_to_le32(mode);
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
- if (acls.pagelist) {
- req->r_pagelist = acls.pagelist;
- acls.pagelist = NULL;
+ if (as_ctx.pagelist) {
+ req->r_pagelist = as_ctx.pagelist;
+ as_ctx.pagelist = NULL;
}
err = ceph_mdsc_do_request(mdsc, dir, req);
if (!err &&
@@ -979,10 +991,10 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
ceph_mdsc_put_request(req);
out:
if (!err)
- ceph_init_inode_acls(d_inode(dentry), &acls);
+ ceph_init_inode_acls(d_inode(dentry), &as_ctx);
else
d_drop(dentry);
- ceph_release_acls_info(&acls);
+ ceph_release_acl_sec_ctx(&as_ctx);
return err;
}
@@ -1433,8 +1445,7 @@ static bool __dentry_lease_is_valid(struct ceph_dentry_info *di)
return false;
}
-static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags,
- struct inode *dir)
+static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags)
{
struct ceph_dentry_info *di;
struct ceph_mds_session *session = NULL;
@@ -1466,7 +1477,7 @@ static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags,
spin_unlock(&dentry->d_lock);
if (session) {
- ceph_mdsc_lease_send_msg(session, dir, dentry,
+ ceph_mdsc_lease_send_msg(session, dentry,
CEPH_MDS_LEASE_RENEW, seq);
ceph_put_mds_session(session);
}
@@ -1512,18 +1523,26 @@ static int __dir_lease_try_check(const struct dentry *dentry)
static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
{
struct ceph_inode_info *ci = ceph_inode(dir);
- struct ceph_dentry_info *di = ceph_dentry(dentry);
- int valid = 0;
+ int valid;
+ int shared_gen;
spin_lock(&ci->i_ceph_lock);
- if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen)
- valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
+ valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
+ shared_gen = atomic_read(&ci->i_shared_gen);
spin_unlock(&ci->i_ceph_lock);
- if (valid)
- __ceph_dentry_dir_lease_touch(di);
- dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n",
- dir, (unsigned)atomic_read(&ci->i_shared_gen),
- dentry, (unsigned)di->lease_shared_gen, valid);
+ if (valid) {
+ struct ceph_dentry_info *di;
+ spin_lock(&dentry->d_lock);
+ di = ceph_dentry(dentry);
+ if (dir == d_inode(dentry->d_parent) &&
+ di && di->lease_shared_gen == shared_gen)
+ __ceph_dentry_dir_lease_touch(di);
+ else
+ valid = 0;
+ spin_unlock(&dentry->d_lock);
+ }
+ dout("dir_lease_is_valid dir %p v%u dentry %p = %d\n",
+ dir, (unsigned)atomic_read(&ci->i_shared_gen), dentry, valid);
return valid;
}
@@ -1558,7 +1577,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
ceph_snap(d_inode(dentry)) == CEPH_SNAPDIR) {
valid = 1;
} else {
- valid = dentry_lease_is_valid(dentry, flags, dir);
+ valid = dentry_lease_is_valid(dentry, flags);
if (valid == -ECHILD)
return valid;
if (valid || dir_lease_is_valid(dir, dentry)) {
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index d3ef7ee429ec..15ff1b09cfa2 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -368,7 +368,7 @@ static struct dentry *ceph_get_parent(struct dentry *child)
}
out:
dout("get_parent %p ino %llx.%llx err=%ld\n",
- child, ceph_vinop(inode), (IS_ERR(dn) ? PTR_ERR(dn) : 0));
+ child, ceph_vinop(inode), (long)PTR_ERR_OR_ZERO(dn));
return dn;
}
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index c5517ffeb11c..685a03cc4b77 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -10,6 +10,7 @@
#include <linux/namei.h>
#include <linux/writeback.h>
#include <linux/falloc.h>
+#include <linux/iversion.h>
#include "super.h"
#include "mds_client.h"
@@ -437,7 +438,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_mds_request *req;
struct dentry *dn;
- struct ceph_acls_info acls = {};
+ struct ceph_acl_sec_ctx as_ctx = {};
int mask;
int err;
@@ -451,25 +452,28 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
if (flags & O_CREAT) {
if (ceph_quota_is_max_files_exceeded(dir))
return -EDQUOT;
- err = ceph_pre_init_acls(dir, &mode, &acls);
+ err = ceph_pre_init_acls(dir, &mode, &as_ctx);
if (err < 0)
return err;
+ err = ceph_security_init_secctx(dentry, mode, &as_ctx);
+ if (err < 0)
+ goto out_ctx;
}
/* do the open */
req = prepare_open_request(dir->i_sb, flags, mode);
if (IS_ERR(req)) {
err = PTR_ERR(req);
- goto out_acl;
+ goto out_ctx;
}
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
if (flags & O_CREAT) {
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
- if (acls.pagelist) {
- req->r_pagelist = acls.pagelist;
- acls.pagelist = NULL;
+ if (as_ctx.pagelist) {
+ req->r_pagelist = as_ctx.pagelist;
+ as_ctx.pagelist = NULL;
}
}
@@ -507,7 +511,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
} else {
dout("atomic_open finish_open on dn %p\n", dn);
if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
- ceph_init_inode_acls(d_inode(dentry), &acls);
+ ceph_init_inode_acls(d_inode(dentry), &as_ctx);
file->f_mode |= FMODE_CREATED;
}
err = finish_open(file, dentry, ceph_open);
@@ -516,8 +520,8 @@ out_req:
if (!req->r_err && req->r_target_inode)
ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode);
ceph_mdsc_put_request(req);
-out_acl:
- ceph_release_acls_info(&acls);
+out_ctx:
+ ceph_release_acl_sec_ctx(&as_ctx);
dout("atomic_open result=%d\n", err);
return err;
}
@@ -1007,7 +1011,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
* may block.
*/
truncate_inode_pages_range(inode->i_mapping, pos,
- (pos+len) | (PAGE_SIZE - 1));
+ PAGE_ALIGN(pos + len) - 1);
req->r_mtime = mtime;
}
@@ -1022,7 +1026,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
req->r_callback = ceph_aio_complete_req;
req->r_inode = inode;
req->r_priv = aio_req;
- list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs);
+ list_add_tail(&req->r_private_item, &aio_req->osd_reqs);
pos += len;
continue;
@@ -1082,8 +1086,8 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
while (!list_empty(&osd_reqs)) {
req = list_first_entry(&osd_reqs,
struct ceph_osd_request,
- r_unsafe_item);
- list_del_init(&req->r_unsafe_item);
+ r_private_item);
+ list_del_init(&req->r_private_item);
if (ret >= 0)
ret = ceph_osdc_start_request(req->r_osdc,
req, false);
@@ -1432,6 +1436,8 @@ retry_snap:
if (err)
goto out;
+ inode_inc_iversion_raw(inode);
+
if (ci->i_inline_version != CEPH_INLINE_NONE) {
err = ceph_uninline_data(file, NULL);
if (err < 0)
@@ -2063,6 +2069,8 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
do_final_copy = true;
file_update_time(dst_file);
+ inode_inc_iversion_raw(dst_inode);
+
if (endoff > size) {
int caps_flags = 0;
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 761451f36e2d..791f84a13bb8 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -13,6 +13,7 @@
#include <linux/posix_acl.h>
#include <linux/random.h>
#include <linux/sort.h>
+#include <linux/iversion.h>
#include "super.h"
#include "mds_client.h"
@@ -42,6 +43,7 @@ static int ceph_set_ino_cb(struct inode *inode, void *data)
{
ceph_inode(inode)->i_vino = *(struct ceph_vino *)data;
inode->i_ino = ceph_vino_to_ino(*(struct ceph_vino *)data);
+ inode_set_iversion_raw(inode, 0);
return 0;
}
@@ -509,6 +511,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
INIT_WORK(&ci->i_work, ceph_inode_work);
ci->i_work_mask = 0;
+ memset(&ci->i_btime, '\0', sizeof(ci->i_btime));
ceph_fscache_inode_init(ci);
@@ -523,17 +526,20 @@ void ceph_free_inode(struct inode *inode)
kmem_cache_free(ceph_inode_cachep, ci);
}
-void ceph_destroy_inode(struct inode *inode)
+void ceph_evict_inode(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_inode_frag *frag;
struct rb_node *n;
- dout("destroy_inode %p ino %llx.%llx\n", inode, ceph_vinop(inode));
+ dout("evict_inode %p ino %llx.%llx\n", inode, ceph_vinop(inode));
+
+ truncate_inode_pages_final(&inode->i_data);
+ clear_inode(inode);
ceph_fscache_unregister_inode_cookie(ci);
- __ceph_remove_caps(inode);
+ __ceph_remove_caps(ci);
if (__ceph_has_any_quota(ci))
ceph_adjust_quota_realms_count(inode, false);
@@ -578,16 +584,6 @@ void ceph_destroy_inode(struct inode *inode)
ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns));
}
-int ceph_drop_inode(struct inode *inode)
-{
- /*
- * Positve dentry and corresponding inode are always accompanied
- * in MDS reply. So no need to keep inode in the cache after
- * dropping all its aliases.
- */
- return 1;
-}
-
static inline blkcnt_t calc_inode_blocks(u64 size)
{
return (size + (1<<9) - 1) >> 9;
@@ -795,6 +791,9 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
le64_to_cpu(info->version) > (ci->i_version & ~1)))
new_version = true;
+ /* Update change_attribute */
+ inode_set_max_iversion_raw(inode, iinfo->change_attr);
+
__ceph_caps_issued(ci, &issued);
issued |= __ceph_caps_dirty(ci);
new_issued = ~issued & info_caps;
@@ -813,6 +812,8 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode,
from_kuid(&init_user_ns, inode->i_uid),
from_kgid(&init_user_ns, inode->i_gid));
+ ceph_decode_timespec64(&ci->i_btime, &iinfo->btime);
+ ceph_decode_timespec64(&ci->i_snap_btime, &iinfo->snap_btime);
}
if ((new_version || (new_issued & CEPH_CAP_LINK_SHARED)) &&
@@ -887,6 +888,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
iinfo->xattr_data, iinfo->xattr_len);
ci->i_xattrs.version = le64_to_cpu(info->xattr_version);
ceph_forget_all_cached_acls(inode);
+ ceph_security_invalidate_secctx(inode);
xattr_blob = NULL;
}
@@ -1027,59 +1029,38 @@ out:
}
/*
- * caller should hold session s_mutex.
+ * caller should hold session s_mutex and dentry->d_lock.
*/
-static void update_dentry_lease(struct dentry *dentry,
- struct ceph_mds_reply_lease *lease,
- struct ceph_mds_session *session,
- unsigned long from_time,
- struct ceph_vino *tgt_vino,
- struct ceph_vino *dir_vino)
+static void __update_dentry_lease(struct inode *dir, struct dentry *dentry,
+ struct ceph_mds_reply_lease *lease,
+ struct ceph_mds_session *session,
+ unsigned long from_time,
+ struct ceph_mds_session **old_lease_session)
{
struct ceph_dentry_info *di = ceph_dentry(dentry);
long unsigned duration = le32_to_cpu(lease->duration_ms);
long unsigned ttl = from_time + (duration * HZ) / 1000;
long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000;
- struct inode *dir;
- struct ceph_mds_session *old_lease_session = NULL;
- /*
- * Make sure dentry's inode matches tgt_vino. NULL tgt_vino means that
- * we expect a negative dentry.
- */
- if (!tgt_vino && d_really_is_positive(dentry))
- return;
-
- if (tgt_vino && (d_really_is_negative(dentry) ||
- !ceph_ino_compare(d_inode(dentry), tgt_vino)))
- return;
-
- spin_lock(&dentry->d_lock);
dout("update_dentry_lease %p duration %lu ms ttl %lu\n",
dentry, duration, ttl);
- dir = d_inode(dentry->d_parent);
-
- /* make sure parent matches dir_vino */
- if (!ceph_ino_compare(dir, dir_vino))
- goto out_unlock;
-
/* only track leases on regular dentries */
if (ceph_snap(dir) != CEPH_NOSNAP)
- goto out_unlock;
+ return;
di->lease_shared_gen = atomic_read(&ceph_inode(dir)->i_shared_gen);
if (duration == 0) {
__ceph_dentry_dir_lease_touch(di);
- goto out_unlock;
+ return;
}
if (di->lease_gen == session->s_cap_gen &&
time_before(ttl, di->time))
- goto out_unlock; /* we already have a newer lease. */
+ return; /* we already have a newer lease. */
if (di->lease_session && di->lease_session != session) {
- old_lease_session = di->lease_session;
+ *old_lease_session = di->lease_session;
di->lease_session = NULL;
}
@@ -1092,6 +1073,62 @@ static void update_dentry_lease(struct dentry *dentry,
di->time = ttl;
__ceph_dentry_lease_touch(di);
+}
+
+static inline void update_dentry_lease(struct inode *dir, struct dentry *dentry,
+ struct ceph_mds_reply_lease *lease,
+ struct ceph_mds_session *session,
+ unsigned long from_time)
+{
+ struct ceph_mds_session *old_lease_session = NULL;
+ spin_lock(&dentry->d_lock);
+ __update_dentry_lease(dir, dentry, lease, session, from_time,
+ &old_lease_session);
+ spin_unlock(&dentry->d_lock);
+ if (old_lease_session)
+ ceph_put_mds_session(old_lease_session);
+}
+
+/*
+ * update dentry lease without having parent inode locked
+ */
+static void update_dentry_lease_careful(struct dentry *dentry,
+ struct ceph_mds_reply_lease *lease,
+ struct ceph_mds_session *session,
+ unsigned long from_time,
+ char *dname, u32 dname_len,
+ struct ceph_vino *pdvino,
+ struct ceph_vino *ptvino)
+
+{
+ struct inode *dir;
+ struct ceph_mds_session *old_lease_session = NULL;
+
+ spin_lock(&dentry->d_lock);
+ /* make sure dentry's name matches target */
+ if (dentry->d_name.len != dname_len ||
+ memcmp(dentry->d_name.name, dname, dname_len))
+ goto out_unlock;
+
+ dir = d_inode(dentry->d_parent);
+ /* make sure parent matches dvino */
+ if (!ceph_ino_compare(dir, pdvino))
+ goto out_unlock;
+
+ /* make sure dentry's inode matches target. NULL ptvino means that
+ * we expect a negative dentry */
+ if (ptvino) {
+ if (d_really_is_negative(dentry))
+ goto out_unlock;
+ if (!ceph_ino_compare(d_inode(dentry), ptvino))
+ goto out_unlock;
+ } else {
+ if (d_really_is_positive(dentry))
+ goto out_unlock;
+ }
+
+ __update_dentry_lease(dir, dentry, lease, session,
+ from_time, &old_lease_session);
out_unlock:
spin_unlock(&dentry->d_lock);
if (old_lease_session)
@@ -1156,19 +1193,6 @@ static int splice_dentry(struct dentry **pdn, struct inode *in)
return 0;
}
-static int d_name_cmp(struct dentry *dentry, const char *name, size_t len)
-{
- int ret;
-
- /* take d_lock to ensure dentry->d_name stability */
- spin_lock(&dentry->d_lock);
- ret = dentry->d_name.len - len;
- if (!ret)
- ret = memcmp(dentry->d_name.name, name, len);
- spin_unlock(&dentry->d_lock);
- return ret;
-}
-
/*
* Incorporate results into the local cache. This is either just
* one inode, or a directory, dentry, and possibly linked-to inode (e.g.,
@@ -1371,10 +1395,9 @@ retry_lookup:
} else if (have_lease) {
if (d_unhashed(dn))
d_add(dn, NULL);
- update_dentry_lease(dn, rinfo->dlease,
- session,
- req->r_request_started,
- NULL, &dvino);
+ update_dentry_lease(dir, dn,
+ rinfo->dlease, session,
+ req->r_request_started);
}
goto done;
}
@@ -1396,11 +1419,9 @@ retry_lookup:
}
if (have_lease) {
- tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
- tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
- update_dentry_lease(dn, rinfo->dlease, session,
- req->r_request_started,
- &tvino, &dvino);
+ update_dentry_lease(dir, dn,
+ rinfo->dlease, session,
+ req->r_request_started);
}
dout(" final dn %p\n", dn);
} else if ((req->r_op == CEPH_MDS_OP_LOOKUPSNAP ||
@@ -1418,27 +1439,20 @@ retry_lookup:
err = splice_dentry(&req->r_dentry, in);
if (err < 0)
goto done;
- } else if (rinfo->head->is_dentry &&
- !d_name_cmp(req->r_dentry, rinfo->dname, rinfo->dname_len)) {
+ } else if (rinfo->head->is_dentry && req->r_dentry) {
+ /* parent inode is not locked, be carefull */
struct ceph_vino *ptvino = NULL;
-
- if ((le32_to_cpu(rinfo->diri.in->cap.caps) & CEPH_CAP_FILE_SHARED) ||
- le32_to_cpu(rinfo->dlease->duration_ms)) {
- dvino.ino = le64_to_cpu(rinfo->diri.in->ino);
- dvino.snap = le64_to_cpu(rinfo->diri.in->snapid);
-
- if (rinfo->head->is_target) {
- tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
- tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
- ptvino = &tvino;
- }
-
- update_dentry_lease(req->r_dentry, rinfo->dlease,
- session, req->r_request_started, ptvino,
- &dvino);
- } else {
- dout("%s: no dentry lease or dir cap\n", __func__);
+ dvino.ino = le64_to_cpu(rinfo->diri.in->ino);
+ dvino.snap = le64_to_cpu(rinfo->diri.in->snapid);
+ if (rinfo->head->is_target) {
+ tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
+ tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
+ ptvino = &tvino;
}
+ update_dentry_lease_careful(req->r_dentry, rinfo->dlease,
+ session, req->r_request_started,
+ rinfo->dname, rinfo->dname_len,
+ &dvino, ptvino);
}
done:
dout("fill_trace done err=%d\n", err);
@@ -1600,7 +1614,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
/* FIXME: release caps/leases if error occurs */
for (i = 0; i < rinfo->dir_nr; i++) {
struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
- struct ceph_vino tvino, dvino;
+ struct ceph_vino tvino;
dname.name = rde->name;
dname.len = rde->name_len;
@@ -1701,9 +1715,9 @@ retry_lookup:
ceph_dentry(dn)->offset = rde->offset;
- dvino = ceph_vino(d_inode(parent));
- update_dentry_lease(dn, rde->lease, req->r_session,
- req->r_request_started, &tvino, &dvino);
+ update_dentry_lease(d_inode(parent), dn,
+ rde->lease, req->r_session,
+ req->r_request_started);
if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
ret = fill_readdir_cache(d_inode(parent), dn,
@@ -2282,7 +2296,7 @@ static int statx_to_caps(u32 want)
{
int mask = 0;
- if (want & (STATX_MODE|STATX_UID|STATX_GID|STATX_CTIME))
+ if (want & (STATX_MODE|STATX_UID|STATX_GID|STATX_CTIME|STATX_BTIME))
mask |= CEPH_CAP_AUTH_SHARED;
if (want & (STATX_NLINK|STATX_CTIME))
@@ -2307,6 +2321,7 @@ int ceph_getattr(const struct path *path, struct kstat *stat,
{
struct inode *inode = d_inode(path->dentry);
struct ceph_inode_info *ci = ceph_inode(inode);
+ u32 valid_mask = STATX_BASIC_STATS;
int err = 0;
/* Skip the getattr altogether if we're asked not to sync */
@@ -2319,6 +2334,16 @@ int ceph_getattr(const struct path *path, struct kstat *stat,
generic_fillattr(inode, stat);
stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino);
+
+ /*
+ * btime on newly-allocated inodes is 0, so if this is still set to
+ * that, then assume that it's not valid.
+ */
+ if (ci->i_btime.tv_sec || ci->i_btime.tv_nsec) {
+ stat->btime = ci->i_btime;
+ valid_mask |= STATX_BTIME;
+ }
+
if (ceph_snap(inode) == CEPH_NOSNAP)
stat->dev = inode->i_sb->s_dev;
else
@@ -2342,7 +2367,6 @@ int ceph_getattr(const struct path *path, struct kstat *stat,
stat->nlink = 1 + 1 + ci->i_subdirs;
}
- /* Mask off any higher bits (e.g. btime) until we have support */
- stat->result_mask = request_mask & STATX_BASIC_STATS;
+ stat->result_mask = request_mask & valid_mask;
return err;
}
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index c8a9b89b922d..920e9f048bd8 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -150,14 +150,13 @@ static int parse_reply_info_in(void **p, void *end,
info->pool_ns_data = *p;
*p += info->pool_ns_len;
}
- /* btime, change_attr */
- {
- struct ceph_timespec btime;
- u64 change_attr;
- ceph_decode_need(p, end, sizeof(btime), bad);
- ceph_decode_copy(p, &btime, sizeof(btime));
- ceph_decode_64_safe(p, end, change_attr, bad);
- }
+
+ /* btime */
+ ceph_decode_need(p, end, sizeof(info->btime), bad);
+ ceph_decode_copy(p, &info->btime, sizeof(info->btime));
+
+ /* change attribute */
+ ceph_decode_64_safe(p, end, info->change_attr, bad);
/* dir pin */
if (struct_v >= 2) {
@@ -166,6 +165,15 @@ static int parse_reply_info_in(void **p, void *end,
info->dir_pin = -ENODATA;
}
+ /* snapshot birth time, remains zero for v<=2 */
+ if (struct_v >= 3) {
+ ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
+ ceph_decode_copy(p, &info->snap_btime,
+ sizeof(info->snap_btime));
+ } else {
+ memset(&info->snap_btime, 0, sizeof(info->snap_btime));
+ }
+
*p = end;
} else {
if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
@@ -197,7 +205,14 @@ static int parse_reply_info_in(void **p, void *end,
}
}
+ if (features & CEPH_FEATURE_FS_BTIME) {
+ ceph_decode_need(p, end, sizeof(info->btime), bad);
+ ceph_decode_copy(p, &info->btime, sizeof(info->btime));
+ ceph_decode_64_safe(p, end, info->change_attr, bad);
+ }
+
info->dir_pin = -ENODATA;
+ /* info->snap_btime remains zero */
}
return 0;
bad:
@@ -717,6 +732,7 @@ void ceph_mdsc_release_request(struct kref *kref)
ceph_pagelist_release(req->r_pagelist);
put_request_session(req);
ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
+ WARN_ON_ONCE(!list_empty(&req->r_wait));
kfree(req);
}
@@ -903,7 +919,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
struct inode *dir;
rcu_read_lock();
- parent = req->r_dentry->d_parent;
+ parent = READ_ONCE(req->r_dentry->d_parent);
dir = req->r_parent ? : d_inode_rcu(parent);
if (!dir || dir->i_sb != mdsc->fsc->sb) {
@@ -2135,7 +2151,7 @@ retry:
memcpy(path + pos, temp->d_name.name, temp->d_name.len);
}
spin_unlock(&temp->d_lock);
- temp = temp->d_parent;
+ temp = READ_ONCE(temp->d_parent);
/* Are we at the root? */
if (IS_ROOT(temp))
@@ -3727,42 +3743,35 @@ static void check_new_map(struct ceph_mds_client *mdsc,
ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
ceph_session_state_name(s->s_state));
- if (i >= newmap->m_num_mds ||
- memcmp(ceph_mdsmap_get_addr(oldmap, i),
- ceph_mdsmap_get_addr(newmap, i),
- sizeof(struct ceph_entity_addr))) {
- if (s->s_state == CEPH_MDS_SESSION_OPENING) {
- /* the session never opened, just close it
- * out now */
- get_session(s);
- __unregister_session(mdsc, s);
- __wake_requests(mdsc, &s->s_waiting);
- ceph_put_mds_session(s);
- } else if (i >= newmap->m_num_mds) {
- /* force close session for stopped mds */
- get_session(s);
- __unregister_session(mdsc, s);
- __wake_requests(mdsc, &s->s_waiting);
- kick_requests(mdsc, i);
- mutex_unlock(&mdsc->mutex);
+ if (i >= newmap->m_num_mds) {
+ /* force close session for stopped mds */
+ get_session(s);
+ __unregister_session(mdsc, s);
+ __wake_requests(mdsc, &s->s_waiting);
+ mutex_unlock(&mdsc->mutex);
- mutex_lock(&s->s_mutex);
- cleanup_session_requests(mdsc, s);
- remove_session_caps(s);
- mutex_unlock(&s->s_mutex);
+ mutex_lock(&s->s_mutex);
+ cleanup_session_requests(mdsc, s);
+ remove_session_caps(s);
+ mutex_unlock(&s->s_mutex);
- ceph_put_mds_session(s);
+ ceph_put_mds_session(s);
- mutex_lock(&mdsc->mutex);
- } else {
- /* just close it */
- mutex_unlock(&mdsc->mutex);
- mutex_lock(&s->s_mutex);
- mutex_lock(&mdsc->mutex);
- ceph_con_close(&s->s_con);
- mutex_unlock(&s->s_mutex);
- s->s_state = CEPH_MDS_SESSION_RESTARTING;
- }
+ mutex_lock(&mdsc->mutex);
+ kick_requests(mdsc, i);
+ continue;
+ }
+
+ if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
+ ceph_mdsmap_get_addr(newmap, i),
+ sizeof(struct ceph_entity_addr))) {
+ /* just close it */
+ mutex_unlock(&mdsc->mutex);
+ mutex_lock(&s->s_mutex);
+ mutex_lock(&mdsc->mutex);
+ ceph_con_close(&s->s_con);
+ mutex_unlock(&s->s_mutex);
+ s->s_state = CEPH_MDS_SESSION_RESTARTING;
} else if (oldstate == newstate) {
continue; /* nothing new with this mds */
}
@@ -3931,31 +3940,33 @@ bad:
}
void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
- struct inode *inode,
struct dentry *dentry, char action,
u32 seq)
{
struct ceph_msg *msg;
struct ceph_mds_lease *lease;
- int len = sizeof(*lease) + sizeof(u32);
- int dnamelen = 0;
+ struct inode *dir;
+ int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
- dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
- inode, dentry, ceph_lease_op_name(action), session->s_mds);
- dnamelen = dentry->d_name.len;
- len += dnamelen;
+ dout("lease_send_msg identry %p %s to mds%d\n",
+ dentry, ceph_lease_op_name(action), session->s_mds);
msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
if (!msg)
return;
lease = msg->front.iov_base;
lease->action = action;
- lease->ino = cpu_to_le64(ceph_vino(inode).ino);
- lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
lease->seq = cpu_to_le32(seq);
- put_unaligned_le32(dnamelen, lease + 1);
- memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
+ spin_lock(&dentry->d_lock);
+ dir = d_inode(dentry->d_parent);
+ lease->ino = cpu_to_le64(ceph_ino(dir));
+ lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
+
+ put_unaligned_le32(dentry->d_name.len, lease + 1);
+ memcpy((void *)(lease + 1) + 4,
+ dentry->d_name.name, dentry->d_name.len);
+ spin_unlock(&dentry->d_lock);
/*
* if this is a preemptive lease RELEASE, no need to
* flush request stream, since the actual request will
@@ -4157,6 +4168,7 @@ static void wait_requests(struct ceph_mds_client *mdsc)
while ((req = __get_oldest_req(mdsc))) {
dout("wait_requests timed out on tid %llu\n",
req->r_tid);
+ list_del_init(&req->r_wait);
__unregister_request(mdsc, req);
}
}
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index a83f28bc2387..f7c8603484fe 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -69,6 +69,9 @@ struct ceph_mds_reply_info_in {
u64 max_bytes;
u64 max_files;
s32 dir_pin;
+ struct ceph_timespec btime;
+ struct ceph_timespec snap_btime;
+ u64 change_attr;
};
struct ceph_mds_reply_dir_entry {
@@ -504,7 +507,6 @@ extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
extern void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry);
extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
- struct inode *inode,
struct dentry *dentry, char action,
u32 seq);
diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c
index 701b4fb0fb5a..ce2d00da5096 100644
--- a/fs/ceph/mdsmap.c
+++ b/fs/ceph/mdsmap.c
@@ -107,7 +107,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
struct ceph_mdsmap *m;
const void *start = *p;
int i, j, n;
- int err = -EINVAL;
+ int err;
u8 mdsmap_v, mdsmap_cv;
u16 mdsmap_ev;
@@ -183,8 +183,9 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
inc = ceph_decode_32(p);
state = ceph_decode_32(p);
state_seq = ceph_decode_64(p);
- ceph_decode_copy(p, &addr, sizeof(addr));
- ceph_decode_addr(&addr);
+ err = ceph_decode_entity_addr(p, end, &addr);
+ if (err)
+ goto corrupt;
ceph_decode_copy(p, &laggy_since, sizeof(laggy_since));
*p += sizeof(u32);
ceph_decode_32_safe(p, end, namelen, bad);
@@ -357,7 +358,7 @@ bad_ext:
nomem:
err = -ENOMEM;
goto out_err;
-bad:
+corrupt:
pr_err("corrupt mdsmap\n");
print_hex_dump(KERN_DEBUG, "mdsmap: ",
DUMP_PREFIX_OFFSET, 16, 1,
@@ -365,6 +366,9 @@ bad:
out_err:
ceph_mdsmap_destroy(m);
return ERR_PTR(err);
+bad:
+ err = -EINVAL;
+ goto corrupt;
}
void ceph_mdsmap_destroy(struct ceph_mdsmap *m)
diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c
index d629fc857450..de56dee60540 100644
--- a/fs/ceph/quota.c
+++ b/fs/ceph/quota.c
@@ -135,7 +135,7 @@ static struct inode *lookup_quotarealm_inode(struct ceph_mds_client *mdsc,
return NULL;
mutex_lock(&qri->mutex);
- if (qri->inode) {
+ if (qri->inode && ceph_is_any_caps(qri->inode)) {
/* A request has already returned the inode */
mutex_unlock(&qri->mutex);
return qri->inode;
@@ -146,7 +146,18 @@ static struct inode *lookup_quotarealm_inode(struct ceph_mds_client *mdsc,
mutex_unlock(&qri->mutex);
return NULL;
}
- in = ceph_lookup_inode(sb, realm->ino);
+ if (qri->inode) {
+ /* get caps */
+ int ret = __ceph_do_getattr(qri->inode, NULL,
+ CEPH_STAT_CAP_INODE, true);
+ if (ret >= 0)
+ in = qri->inode;
+ else
+ in = ERR_PTR(ret);
+ } else {
+ in = ceph_lookup_inode(sb, realm->ino);
+ }
+
if (IS_ERR(in)) {
pr_warn("Can't lookup inode %llx (err: %ld)\n",
realm->ino, PTR_ERR(in));
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 72c6c022f02b..4c6494eb02b5 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -3,6 +3,7 @@
#include <linux/sort.h>
#include <linux/slab.h>
+#include <linux/iversion.h>
#include "super.h"
#include "mds_client.h"
#include <linux/ceph/decode.h>
@@ -606,6 +607,8 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
capsnap->mtime = inode->i_mtime;
capsnap->atime = inode->i_atime;
capsnap->ctime = inode->i_ctime;
+ capsnap->btime = ci->i_btime;
+ capsnap->change_attr = inode_peek_iversion_raw(inode);
capsnap->time_warp_seq = ci->i_time_warp_seq;
capsnap->truncate_size = ci->i_truncate_size;
capsnap->truncate_seq = ci->i_truncate_seq;
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index ed1b65a6c2c3..ab4868c7308e 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -840,10 +840,10 @@ static int ceph_remount(struct super_block *sb, int *flags, char *data)
static const struct super_operations ceph_super_ops = {
.alloc_inode = ceph_alloc_inode,
- .destroy_inode = ceph_destroy_inode,
.free_inode = ceph_free_inode,
.write_inode = ceph_write_inode,
- .drop_inode = ceph_drop_inode,
+ .drop_inode = generic_delete_inode,
+ .evict_inode = ceph_evict_inode,
.sync_fs = ceph_sync_fs,
.put_super = ceph_put_super,
.remount_fs = ceph_remount,
@@ -978,7 +978,7 @@ static int ceph_set_super(struct super_block *s, void *data)
s->s_d_op = &ceph_dentry_ops;
s->s_export_op = &ceph_export_ops;
- s->s_time_gran = 1000; /* 1000 ns == 1 us */
+ s->s_time_gran = 1;
ret = set_anon_super(s, NULL); /* what is that second arg for? */
if (ret != 0)
@@ -1159,17 +1159,15 @@ static int __init init_ceph(void)
goto out;
ceph_flock_init();
- ceph_xattr_init();
ret = register_filesystem(&ceph_fs_type);
if (ret)
- goto out_xattr;
+ goto out_caches;
pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL);
return 0;
-out_xattr:
- ceph_xattr_exit();
+out_caches:
destroy_caches();
out:
return ret;
@@ -1179,7 +1177,6 @@ static void __exit exit_ceph(void)
{
dout("exit_ceph\n");
unregister_filesystem(&ceph_fs_type);
- ceph_xattr_exit();
destroy_caches();
}
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index fbe6869a3f95..d2352fd95dbc 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -197,7 +197,8 @@ struct ceph_cap_snap {
u64 xattr_version;
u64 size;
- struct timespec64 mtime, atime, ctime;
+ u64 change_attr;
+ struct timespec64 mtime, atime, ctime, btime;
u64 time_warp_seq;
u64 truncate_size;
u32 truncate_seq;
@@ -384,6 +385,8 @@ struct ceph_inode_info {
int i_snap_realm_counter; /* snap realm (if caps) */
struct list_head i_snap_realm_item;
struct list_head i_snap_flush_item;
+ struct timespec64 i_btime;
+ struct timespec64 i_snap_btime;
struct work_struct i_work;
unsigned long i_work_mask;
@@ -544,7 +547,12 @@ static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
long long release_count,
long long ordered_count)
{
- smp_mb__before_atomic();
+ /*
+ * Makes sure operations that setup readdir cache (update page
+ * cache and i_size) are strongly ordered w.r.t. the following
+ * atomic64_set() operations.
+ */
+ smp_mb();
atomic64_set(&ci->i_complete_seq[0], release_count);
atomic64_set(&ci->i_complete_seq[1], ordered_count);
}
@@ -876,9 +884,8 @@ static inline bool __ceph_have_pending_cap_snap(struct ceph_inode_info *ci)
extern const struct inode_operations ceph_file_iops;
extern struct inode *ceph_alloc_inode(struct super_block *sb);
-extern void ceph_destroy_inode(struct inode *inode);
+extern void ceph_evict_inode(struct inode *inode);
extern void ceph_free_inode(struct inode *inode);
-extern int ceph_drop_inode(struct inode *inode);
extern struct inode *ceph_get_inode(struct super_block *sb,
struct ceph_vino vino);
@@ -921,10 +928,20 @@ ssize_t __ceph_getxattr(struct inode *, const char *, void *, size_t);
extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci);
extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci);
-extern void __init ceph_xattr_init(void);
-extern void ceph_xattr_exit(void);
extern const struct xattr_handler *ceph_xattr_handlers[];
+struct ceph_acl_sec_ctx {
+#ifdef CONFIG_CEPH_FS_POSIX_ACL
+ void *default_acl;
+ void *acl;
+#endif
+#ifdef CONFIG_CEPH_FS_SECURITY_LABEL
+ void *sec_ctx;
+ u32 sec_ctxlen;
+#endif
+ struct ceph_pagelist *pagelist;
+};
+
#ifdef CONFIG_SECURITY
extern bool ceph_security_xattr_deadlock(struct inode *in);
extern bool ceph_security_xattr_wanted(struct inode *in);
@@ -939,21 +956,32 @@ static inline bool ceph_security_xattr_wanted(struct inode *in)
}
#endif
-/* acl.c */
-struct ceph_acls_info {
- void *default_acl;
- void *acl;
- struct ceph_pagelist *pagelist;
-};
+#ifdef CONFIG_CEPH_FS_SECURITY_LABEL
+extern int ceph_security_init_secctx(struct dentry *dentry, umode_t mode,
+ struct ceph_acl_sec_ctx *ctx);
+extern void ceph_security_invalidate_secctx(struct inode *inode);
+#else
+static inline int ceph_security_init_secctx(struct dentry *dentry, umode_t mode,
+ struct ceph_acl_sec_ctx *ctx)
+{
+ return 0;
+}
+static inline void ceph_security_invalidate_secctx(struct inode *inode)
+{
+}
+#endif
+
+void ceph_release_acl_sec_ctx(struct ceph_acl_sec_ctx *as_ctx);
+/* acl.c */
#ifdef CONFIG_CEPH_FS_POSIX_ACL
struct posix_acl *ceph_get_acl(struct inode *, int);
int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type);
int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
- struct ceph_acls_info *info);
-void ceph_init_inode_acls(struct inode *inode, struct ceph_acls_info *info);
-void ceph_release_acls_info(struct ceph_acls_info *info);
+ struct ceph_acl_sec_ctx *as_ctx);
+void ceph_init_inode_acls(struct inode *inode,
+ struct ceph_acl_sec_ctx *as_ctx);
static inline void ceph_forget_all_cached_acls(struct inode *inode)
{
@@ -966,15 +994,12 @@ static inline void ceph_forget_all_cached_acls(struct inode *inode)
#define ceph_set_acl NULL
static inline int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
- struct ceph_acls_info *info)
+ struct ceph_acl_sec_ctx *as_ctx)
{
return 0;
}
static inline void ceph_init_inode_acls(struct inode *inode,
- struct ceph_acls_info *info)
-{
-}
-static inline void ceph_release_acls_info(struct ceph_acls_info *info)
+ struct ceph_acl_sec_ctx *as_ctx)
{
}
static inline int ceph_acl_chmod(struct dentry *dentry, struct inode *inode)
@@ -1000,7 +1025,7 @@ extern void ceph_add_cap(struct inode *inode,
unsigned cap, unsigned seq, u64 realmino, int flags,
struct ceph_cap **new_cap);
extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
-extern void __ceph_remove_caps(struct inode* inode);
+extern void __ceph_remove_caps(struct ceph_inode_info *ci);
extern void ceph_put_cap(struct ceph_mds_client *mdsc,
struct ceph_cap *cap);
extern int ceph_is_any_caps(struct inode *inode);
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 0cc42c8879e9..37b458a9af3a 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -8,6 +8,7 @@
#include <linux/ceph/decode.h>
#include <linux/xattr.h>
+#include <linux/security.h>
#include <linux/posix_acl_xattr.h>
#include <linux/slab.h>
@@ -17,26 +18,9 @@
static int __remove_xattr(struct ceph_inode_info *ci,
struct ceph_inode_xattr *xattr);
-static const struct xattr_handler ceph_other_xattr_handler;
-
-/*
- * List of handlers for synthetic system.* attributes. Other
- * attributes are handled directly.
- */
-const struct xattr_handler *ceph_xattr_handlers[] = {
-#ifdef CONFIG_CEPH_FS_POSIX_ACL
- &posix_acl_access_xattr_handler,
- &posix_acl_default_xattr_handler,
-#endif
- &ceph_other_xattr_handler,
- NULL,
-};
-
static bool ceph_is_valid_xattr(const char *name)
{
return !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) ||
- !strncmp(name, XATTR_SECURITY_PREFIX,
- XATTR_SECURITY_PREFIX_LEN) ||
!strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) ||
!strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN);
}
@@ -48,8 +32,8 @@ static bool ceph_is_valid_xattr(const char *name)
struct ceph_vxattr {
char *name;
size_t name_size; /* strlen(name) + 1 (for '\0') */
- size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val,
- size_t size);
+ ssize_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val,
+ size_t size);
bool (*exists_cb)(struct ceph_inode_info *ci);
unsigned int flags;
};
@@ -68,8 +52,8 @@ static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci)
rcu_dereference_raw(fl->pool_ns) != NULL);
}
-static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
- size_t size)
+static ssize_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
+ size_t size)
{
struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
struct ceph_osd_client *osdc = &fsc->client->osdc;
@@ -79,7 +63,7 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
const char *ns_field = " pool_namespace=";
char buf[128];
size_t len, total_len = 0;
- int ret;
+ ssize_t ret;
pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
@@ -96,18 +80,15 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
len = snprintf(buf, sizeof(buf),
"stripe_unit=%u stripe_count=%u object_size=%u pool=%lld",
ci->i_layout.stripe_unit, ci->i_layout.stripe_count,
- ci->i_layout.object_size, (unsigned long long)pool);
+ ci->i_layout.object_size, pool);
total_len = len;
}
if (pool_ns)
total_len += strlen(ns_field) + pool_ns->len;
- if (!size) {
- ret = total_len;
- } else if (total_len > size) {
- ret = -ERANGE;
- } else {
+ ret = total_len;
+ if (size >= total_len) {
memcpy(val, buf, len);
ret = len;
if (pool_name) {
@@ -128,28 +109,55 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
return ret;
}
-static size_t ceph_vxattrcb_layout_stripe_unit(struct ceph_inode_info *ci,
- char *val, size_t size)
+/*
+ * The convention with strings in xattrs is that they should not be NULL
+ * terminated, since we're returning the length with them. snprintf always
+ * NULL terminates however, so call it on a temporary buffer and then memcpy
+ * the result into place.
+ */
+static int ceph_fmt_xattr(char *val, size_t size, const char *fmt, ...)
{
- return snprintf(val, size, "%u", ci->i_layout.stripe_unit);
+ int ret;
+ va_list args;
+ char buf[96]; /* NB: reevaluate size if new vxattrs are added */
+
+ va_start(args, fmt);
+ ret = vsnprintf(buf, size ? sizeof(buf) : 0, fmt, args);
+ va_end(args);
+
+ /* Sanity check */
+ if (size && ret + 1 > sizeof(buf)) {
+ WARN_ONCE(true, "Returned length too big (%d)", ret);
+ return -E2BIG;
+ }
+
+ if (ret <= size)
+ memcpy(val, buf, ret);
+ return ret;
}
-static size_t ceph_vxattrcb_layout_stripe_count(struct ceph_inode_info *ci,
+static ssize_t ceph_vxattrcb_layout_stripe_unit(struct ceph_inode_info *ci,
char *val, size_t size)
{
- return snprintf(val, size, "%u", ci->i_layout.stripe_count);
+ return ceph_fmt_xattr(val, size, "%u", ci->i_layout.stripe_unit);
+}
+
+static ssize_t ceph_vxattrcb_layout_stripe_count(struct ceph_inode_info *ci,
+ char *val, size_t size)
+{
+ return ceph_fmt_xattr(val, size, "%u", ci->i_layout.stripe_count);
}
-static size_t ceph_vxattrcb_layout_object_size(struct ceph_inode_info *ci,
- char *val, size_t size)
+static ssize_t ceph_vxattrcb_layout_object_size(struct ceph_inode_info *ci,
+ char *val, size_t size)
{
- return snprintf(val, size, "%u", ci->i_layout.object_size);
+ return ceph_fmt_xattr(val, size, "%u", ci->i_layout.object_size);
}
-static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,
- char *val, size_t size)
+static ssize_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,
+ char *val, size_t size)
{
- int ret;
+ ssize_t ret;
struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
struct ceph_osd_client *osdc = &fsc->client->osdc;
s64 pool = ci->i_layout.pool_id;
@@ -157,21 +165,27 @@ static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,
down_read(&osdc->lock);
pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
- if (pool_name)
- ret = snprintf(val, size, "%s", pool_name);
- else
- ret = snprintf(val, size, "%lld", (unsigned long long)pool);
+ if (pool_name) {
+ ret = strlen(pool_name);
+ if (ret <= size)
+ memcpy(val, pool_name, ret);
+ } else {
+ ret = ceph_fmt_xattr(val, size, "%lld", pool);
+ }
up_read(&osdc->lock);
return ret;
}
-static size_t ceph_vxattrcb_layout_pool_namespace(struct ceph_inode_info *ci,
- char *val, size_t size)
+static ssize_t ceph_vxattrcb_layout_pool_namespace(struct ceph_inode_info *ci,
+ char *val, size_t size)
{
- int ret = 0;
+ ssize_t ret = 0;
struct ceph_string *ns = ceph_try_get_string(ci->i_layout.pool_ns);
+
if (ns) {
- ret = snprintf(val, size, "%.*s", (int)ns->len, ns->str);
+ ret = ns->len;
+ if (ret <= size)
+ memcpy(val, ns->str, ret);
ceph_put_string(ns);
}
return ret;
@@ -179,53 +193,54 @@ static size_t ceph_vxattrcb_layout_pool_namespace(struct ceph_inode_info *ci,
/* directories */
-static size_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val,
- size_t size)
+static ssize_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val,
+ size_t size)
{
- return snprintf(val, size, "%lld", ci->i_files + ci->i_subdirs);
+ return ceph_fmt_xattr(val, size, "%lld", ci->i_files + ci->i_subdirs);
}
-static size_t ceph_vxattrcb_dir_files(struct ceph_inode_info *ci, char *val,
- size_t size)
+static ssize_t ceph_vxattrcb_dir_files(struct ceph_inode_info *ci, char *val,
+ size_t size)
{
- return snprintf(val, size, "%lld", ci->i_files);
+ return ceph_fmt_xattr(val, size, "%lld", ci->i_files);
}
-static size_t ceph_vxattrcb_dir_subdirs(struct ceph_inode_info *ci, char *val,
- size_t size)
+static ssize_t ceph_vxattrcb_dir_subdirs(struct ceph_inode_info *ci, char *val,
+ size_t size)
{
- return snprintf(val, size, "%lld", ci->i_subdirs);
+ return ceph_fmt_xattr(val, size, "%lld", ci->i_subdirs);
}
-static size_t ceph_vxattrcb_dir_rentries(struct ceph_inode_info *ci, char *val,
- size_t size)
+static ssize_t ceph_vxattrcb_dir_rentries(struct ceph_inode_info *ci, char *val,
+ size_t size)
{
- return snprintf(val, size, "%lld", ci->i_rfiles + ci->i_rsubdirs);
+ return ceph_fmt_xattr(val, size, "%lld",
+ ci->i_rfiles + ci->i_rsubdirs);
}
-static size_t ceph_vxattrcb_dir_rfiles(struct ceph_inode_info *ci, char *val,
- size_t size)
+static ssize_t ceph_vxattrcb_dir_rfiles(struct ceph_inode_info *ci, char *val,
+ size_t size)
{
- return snprintf(val, size, "%lld", ci->i_rfiles);
+ return ceph_fmt_xattr(val, size, "%lld", ci->i_rfiles);
}
-static size_t ceph_vxattrcb_dir_rsubdirs(struct ceph_inode_info *ci, char *val,
- size_t size)
+static ssize_t ceph_vxattrcb_dir_rsubdirs(struct ceph_inode_info *ci, char *val,
+ size_t size)
{
- return snprintf(val, size, "%lld", ci->i_rsubdirs);
+ return ceph_fmt_xattr(val, size, "%lld", ci->i_rsubdirs);
}
-static size_t ceph_vxattrcb_dir_rbytes(struct ceph_inode_info *ci, char *val,
- size_t size)
+static ssize_t ceph_vxattrcb_dir_rbytes(struct ceph_inode_info *ci, char *val,
+ size_t size)
{
- return snprintf(val, size, "%lld", ci->i_rbytes);
+ return ceph_fmt_xattr(val, size, "%lld", ci->i_rbytes);
}
-static size_t ceph_vxattrcb_dir_rctime(struct ceph_inode_info *ci, char *val,
- size_t size)
+static ssize_t ceph_vxattrcb_dir_rctime(struct ceph_inode_info *ci, char *val,
+ size_t size)
{
- return snprintf(val, size, "%lld.09%ld", ci->i_rctime.tv_sec,
- ci->i_rctime.tv_nsec);
+ return ceph_fmt_xattr(val, size, "%lld.%09ld", ci->i_rctime.tv_sec,
+ ci->i_rctime.tv_nsec);
}
/* dir pin */
@@ -234,10 +249,10 @@ static bool ceph_vxattrcb_dir_pin_exists(struct ceph_inode_info *ci)
return ci->i_dir_pin != -ENODATA;
}
-static size_t ceph_vxattrcb_dir_pin(struct ceph_inode_info *ci, char *val,
- size_t size)
+static ssize_t ceph_vxattrcb_dir_pin(struct ceph_inode_info *ci, char *val,
+ size_t size)
{
- return snprintf(val, size, "%d", (int)ci->i_dir_pin);
+ return ceph_fmt_xattr(val, size, "%d", (int)ci->i_dir_pin);
}
/* quotas */
@@ -254,23 +269,36 @@ static bool ceph_vxattrcb_quota_exists(struct ceph_inode_info *ci)
return ret;
}
-static size_t ceph_vxattrcb_quota(struct ceph_inode_info *ci, char *val,
- size_t size)
+static ssize_t ceph_vxattrcb_quota(struct ceph_inode_info *ci, char *val,
+ size_t size)
+{
+ return ceph_fmt_xattr(val, size, "max_bytes=%llu max_files=%llu",
+ ci->i_max_bytes, ci->i_max_files);
+}
+
+static ssize_t ceph_vxattrcb_quota_max_bytes(struct ceph_inode_info *ci,
+ char *val, size_t size)
{
- return snprintf(val, size, "max_bytes=%llu max_files=%llu",
- ci->i_max_bytes, ci->i_max_files);
+ return ceph_fmt_xattr(val, size, "%llu", ci->i_max_bytes);
}
-static size_t ceph_vxattrcb_quota_max_bytes(struct ceph_inode_info *ci,
- char *val, size_t size)
+static ssize_t ceph_vxattrcb_quota_max_files(struct ceph_inode_info *ci,
+ char *val, size_t size)
{
- return snprintf(val, size, "%llu", ci->i_max_bytes);
+ return ceph_fmt_xattr(val, size, "%llu", ci->i_max_files);
}
-static size_t ceph_vxattrcb_quota_max_files(struct ceph_inode_info *ci,
- char *val, size_t size)
+/* snapshots */
+static bool ceph_vxattrcb_snap_btime_exists(struct ceph_inode_info *ci)
{
- return snprintf(val, size, "%llu", ci->i_max_files);
+ return (ci->i_snap_btime.tv_sec != 0 || ci->i_snap_btime.tv_nsec != 0);
+}
+
+static ssize_t ceph_vxattrcb_snap_btime(struct ceph_inode_info *ci, char *val,
+ size_t size)
+{
+ return ceph_fmt_xattr(val, size, "%lld.%09ld", ci->i_snap_btime.tv_sec,
+ ci->i_snap_btime.tv_nsec);
}
#define CEPH_XATTR_NAME(_type, _name) XATTR_CEPH_PREFIX #_type "." #_name
@@ -327,7 +355,7 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = {
XATTR_RSTAT_FIELD(dir, rctime),
{
.name = "ceph.dir.pin",
- .name_size = sizeof("ceph.dir_pin"),
+ .name_size = sizeof("ceph.dir.pin"),
.getxattr_cb = ceph_vxattrcb_dir_pin,
.exists_cb = ceph_vxattrcb_dir_pin_exists,
.flags = VXATTR_FLAG_HIDDEN,
@@ -341,9 +369,15 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = {
},
XATTR_QUOTA_FIELD(quota, max_bytes),
XATTR_QUOTA_FIELD(quota, max_files),
+ {
+ .name = "ceph.snap.btime",
+ .name_size = sizeof("ceph.snap.btime"),
+ .getxattr_cb = ceph_vxattrcb_snap_btime,
+ .exists_cb = ceph_vxattrcb_snap_btime_exists,
+ .flags = VXATTR_FLAG_READONLY,
+ },
{ .name = NULL, 0 } /* Required table terminator */
};
-static size_t ceph_dir_vxattrs_name_size; /* total size of all names */
/* files */
@@ -360,9 +394,15 @@ static struct ceph_vxattr ceph_file_vxattrs[] = {
XATTR_LAYOUT_FIELD(file, layout, object_size),
XATTR_LAYOUT_FIELD(file, layout, pool),
XATTR_LAYOUT_FIELD(file, layout, pool_namespace),
+ {
+ .name = "ceph.snap.btime",
+ .name_size = sizeof("ceph.snap.btime"),
+ .getxattr_cb = ceph_vxattrcb_snap_btime,
+ .exists_cb = ceph_vxattrcb_snap_btime_exists,
+ .flags = VXATTR_FLAG_READONLY,
+ },
{ .name = NULL, 0 } /* Required table terminator */
};
-static size_t ceph_file_vxattrs_name_size; /* total size of all names */
static struct ceph_vxattr *ceph_inode_vxattrs(struct inode *inode)
{
@@ -373,47 +413,6 @@ static struct ceph_vxattr *ceph_inode_vxattrs(struct inode *inode)
return NULL;
}
-static size_t ceph_vxattrs_name_size(struct ceph_vxattr *vxattrs)
-{
- if (vxattrs == ceph_dir_vxattrs)
- return ceph_dir_vxattrs_name_size;
- if (vxattrs == ceph_file_vxattrs)
- return ceph_file_vxattrs_name_size;
- BUG_ON(vxattrs);
- return 0;
-}
-
-/*
- * Compute the aggregate size (including terminating '\0') of all
- * virtual extended attribute names in the given vxattr table.
- */
-static size_t __init vxattrs_name_size(struct ceph_vxattr *vxattrs)
-{
- struct ceph_vxattr *vxattr;
- size_t size = 0;
-
- for (vxattr = vxattrs; vxattr->name; vxattr++) {
- if (!(vxattr->flags & VXATTR_FLAG_HIDDEN))
- size += vxattr->name_size;
- }
-
- return size;
-}
-
-/* Routines called at initialization and exit time */
-
-void __init ceph_xattr_init(void)
-{
- ceph_dir_vxattrs_name_size = vxattrs_name_size(ceph_dir_vxattrs);
- ceph_file_vxattrs_name_size = vxattrs_name_size(ceph_file_vxattrs);
-}
-
-void ceph_xattr_exit(void)
-{
- ceph_dir_vxattrs_name_size = 0;
- ceph_file_vxattrs_name_size = 0;
-}
-
static struct ceph_vxattr *ceph_match_vxattr(struct inode *inode,
const char *name)
{
@@ -523,8 +522,8 @@ static int __set_xattr(struct ceph_inode_info *ci,
dout("__set_xattr_val p=%p\n", p);
}
- dout("__set_xattr_val added %llx.%llx xattr %p %s=%.*s\n",
- ceph_vinop(&ci->vfs_inode), xattr, name, val_len, val);
+ dout("__set_xattr_val added %llx.%llx xattr %p %.*s=%.*s\n",
+ ceph_vinop(&ci->vfs_inode), xattr, name_len, name, val_len, val);
return 0;
}
@@ -823,7 +822,7 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
struct ceph_inode_xattr *xattr;
struct ceph_vxattr *vxattr = NULL;
int req_mask;
- int err;
+ ssize_t err;
/* let's see if a virtual xattr was requested */
vxattr = ceph_match_vxattr(inode, name);
@@ -835,8 +834,11 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
if (err)
return err;
err = -ENODATA;
- if (!(vxattr->exists_cb && !vxattr->exists_cb(ci)))
+ if (!(vxattr->exists_cb && !vxattr->exists_cb(ci))) {
err = vxattr->getxattr_cb(ci, value, size);
+ if (size && size < err)
+ err = -ERANGE;
+ }
return err;
}
@@ -897,10 +899,9 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
struct inode *inode = d_inode(dentry);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_vxattr *vxattrs = ceph_inode_vxattrs(inode);
- u32 vir_namelen = 0;
+ bool len_only = (size == 0);
u32 namelen;
int err;
- u32 len;
int i;
spin_lock(&ci->i_ceph_lock);
@@ -919,38 +920,45 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
err = __build_xattrs(inode);
if (err < 0)
goto out;
- /*
- * Start with virtual dir xattr names (if any) (including
- * terminating '\0' characters for each).
- */
- vir_namelen = ceph_vxattrs_name_size(vxattrs);
- /* adding 1 byte per each variable due to the null termination */
+ /* add 1 byte for each xattr due to the null termination */
namelen = ci->i_xattrs.names_size + ci->i_xattrs.count;
- err = -ERANGE;
- if (size && vir_namelen + namelen > size)
- goto out;
-
- err = namelen + vir_namelen;
- if (size == 0)
- goto out;
+ if (!len_only) {
+ if (namelen > size) {
+ err = -ERANGE;
+ goto out;
+ }
+ names = __copy_xattr_names(ci, names);
+ size -= namelen;
+ }
- names = __copy_xattr_names(ci, names);
/* virtual xattr names, too */
- err = namelen;
if (vxattrs) {
for (i = 0; vxattrs[i].name; i++) {
- if (!(vxattrs[i].flags & VXATTR_FLAG_HIDDEN) &&
- !(vxattrs[i].exists_cb &&
- !vxattrs[i].exists_cb(ci))) {
- len = sprintf(names, "%s", vxattrs[i].name);
- names += len + 1;
- err += len + 1;
+ size_t this_len;
+
+ if (vxattrs[i].flags & VXATTR_FLAG_HIDDEN)
+ continue;
+ if (vxattrs[i].exists_cb && !vxattrs[i].exists_cb(ci))
+ continue;
+
+ this_len = strlen(vxattrs[i].name) + 1;
+ namelen += this_len;
+ if (len_only)
+ continue;
+
+ if (this_len > size) {
+ err = -ERANGE;
+ goto out;
}
+
+ memcpy(names, vxattrs[i].name, this_len);
+ names += this_len;
+ size -= this_len;
}
}
-
+ err = namelen;
out:
spin_unlock(&ci->i_ceph_lock);
return err;
@@ -1206,4 +1214,138 @@ bool ceph_security_xattr_deadlock(struct inode *in)
spin_unlock(&ci->i_ceph_lock);
return ret;
}
+
+#ifdef CONFIG_CEPH_FS_SECURITY_LABEL
+int ceph_security_init_secctx(struct dentry *dentry, umode_t mode,
+ struct ceph_acl_sec_ctx *as_ctx)
+{
+ struct ceph_pagelist *pagelist = as_ctx->pagelist;
+ const char *name;
+ size_t name_len;
+ int err;
+
+ err = security_dentry_init_security(dentry, mode, &dentry->d_name,
+ &as_ctx->sec_ctx,
+ &as_ctx->sec_ctxlen);
+ if (err < 0) {
+ WARN_ON_ONCE(err != -EOPNOTSUPP);
+ err = 0; /* do nothing */
+ goto out;
+ }
+
+ err = -ENOMEM;
+ if (!pagelist) {
+ pagelist = ceph_pagelist_alloc(GFP_KERNEL);
+ if (!pagelist)
+ goto out;
+ err = ceph_pagelist_reserve(pagelist, PAGE_SIZE);
+ if (err)
+ goto out;
+ ceph_pagelist_encode_32(pagelist, 1);
+ }
+
+ /*
+ * FIXME: Make security_dentry_init_security() generic. Currently
+ * It only supports single security module and only selinux has
+ * dentry_init_security hook.
+ */
+ name = XATTR_NAME_SELINUX;
+ name_len = strlen(name);
+ err = ceph_pagelist_reserve(pagelist,
+ 4 * 2 + name_len + as_ctx->sec_ctxlen);
+ if (err)
+ goto out;
+
+ if (as_ctx->pagelist) {
+ /* update count of KV pairs */
+ BUG_ON(pagelist->length <= sizeof(__le32));
+ if (list_is_singular(&pagelist->head)) {
+ le32_add_cpu((__le32*)pagelist->mapped_tail, 1);
+ } else {
+ struct page *page = list_first_entry(&pagelist->head,
+ struct page, lru);
+ void *addr = kmap_atomic(page);
+ le32_add_cpu((__le32*)addr, 1);
+ kunmap_atomic(addr);
+ }
+ } else {
+ as_ctx->pagelist = pagelist;
+ }
+
+ ceph_pagelist_encode_32(pagelist, name_len);
+ ceph_pagelist_append(pagelist, name, name_len);
+
+ ceph_pagelist_encode_32(pagelist, as_ctx->sec_ctxlen);
+ ceph_pagelist_append(pagelist, as_ctx->sec_ctx, as_ctx->sec_ctxlen);
+
+ err = 0;
+out:
+ if (pagelist && !as_ctx->pagelist)
+ ceph_pagelist_release(pagelist);
+ return err;
+}
+
+void ceph_security_invalidate_secctx(struct inode *inode)
+{
+ security_inode_invalidate_secctx(inode);
+}
+
+static int ceph_xattr_set_security_label(const struct xattr_handler *handler,
+ struct dentry *unused, struct inode *inode,
+ const char *key, const void *buf,
+ size_t buflen, int flags)
+{
+ if (security_ismaclabel(key)) {
+ const char *name = xattr_full_name(handler, key);
+ return __ceph_setxattr(inode, name, buf, buflen, flags);
+ }
+ return -EOPNOTSUPP;
+}
+
+static int ceph_xattr_get_security_label(const struct xattr_handler *handler,
+ struct dentry *unused, struct inode *inode,
+ const char *key, void *buf, size_t buflen)
+{
+ if (security_ismaclabel(key)) {
+ const char *name = xattr_full_name(handler, key);
+ return __ceph_getxattr(inode, name, buf, buflen);
+ }
+ return -EOPNOTSUPP;
+}
+
+static const struct xattr_handler ceph_security_label_handler = {
+ .prefix = XATTR_SECURITY_PREFIX,
+ .get = ceph_xattr_get_security_label,
+ .set = ceph_xattr_set_security_label,
+};
+#endif
#endif
+
+void ceph_release_acl_sec_ctx(struct ceph_acl_sec_ctx *as_ctx)
+{
+#ifdef CONFIG_CEPH_FS_POSIX_ACL
+ posix_acl_release(as_ctx->acl);
+ posix_acl_release(as_ctx->default_acl);
+#endif
+#ifdef CONFIG_CEPH_FS_SECURITY_LABEL
+ security_release_secctx(as_ctx->sec_ctx, as_ctx->sec_ctxlen);
+#endif
+ if (as_ctx->pagelist)
+ ceph_pagelist_release(as_ctx->pagelist);
+}
+
+/*
+ * List of handlers for synthetic system.* attributes. Other
+ * attributes are handled directly.
+ */
+const struct xattr_handler *ceph_xattr_handlers[] = {
+#ifdef CONFIG_CEPH_FS_POSIX_ACL
+ &posix_acl_access_xattr_handler,
+ &posix_acl_default_xattr_handler,
+#endif
+#ifdef CONFIG_CEPH_FS_SECURITY_LABEL
+ &ceph_security_label_handler,
+#endif
+ &ceph_other_xattr_handler,
+ NULL,
+};
diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h
index 65a38c4a02a1..39e6f4c57580 100644
--- a/include/linux/ceph/ceph_features.h
+++ b/include/linux/ceph/ceph_features.h
@@ -211,6 +211,7 @@ DEFINE_CEPH_FEATURE_DEPRECATED(63, 1, RESERVED_BROKEN, LUMINOUS) // client-facin
CEPH_FEATURE_MON_STATEFUL_SUB | \
CEPH_FEATURE_CRUSH_TUNABLES5 | \
CEPH_FEATURE_NEW_OSDOPREPLY_ENCODING | \
+ CEPH_FEATURE_MSG_ADDR2 | \
CEPH_FEATURE_CEPHX_V2)
#define CEPH_FEATURES_REQUIRED_DEFAULT 0
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index 3ac0feaf2b5e..cb21c5cf12c3 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -682,7 +682,7 @@ extern const char *ceph_cap_op_name(int op);
/* flags field in client cap messages (version >= 10) */
#define CEPH_CLIENT_CAPS_SYNC (1<<0)
#define CEPH_CLIENT_CAPS_NO_CAPSNAP (1<<1)
-#define CEPH_CLIENT_CAPS_PENDING_CAPSNAP (1<<2);
+#define CEPH_CLIENT_CAPS_PENDING_CAPSNAP (1<<2)
/*
* caps message, used for capability callbacks, acks, requests, etc.
diff --git a/include/linux/ceph/cls_lock_client.h b/include/linux/ceph/cls_lock_client.h
index bea6c77d2093..17bc7584d1fe 100644
--- a/include/linux/ceph/cls_lock_client.h
+++ b/include/linux/ceph/cls_lock_client.h
@@ -52,4 +52,7 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc,
char *lock_name, u8 *type, char **tag,
struct ceph_locker **lockers, u32 *num_lockers);
+int ceph_cls_assert_locked(struct ceph_osd_request *req, int which,
+ char *lock_name, u8 type, char *cookie, char *tag);
+
#endif
diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h
index a6c2a48d42e0..450384fe487c 100644
--- a/include/linux/ceph/decode.h
+++ b/include/linux/ceph/decode.h
@@ -218,18 +218,27 @@ static inline void ceph_encode_timespec64(struct ceph_timespec *tv,
/*
* sockaddr_storage <-> ceph_sockaddr
*/
-static inline void ceph_encode_addr(struct ceph_entity_addr *a)
+#define CEPH_ENTITY_ADDR_TYPE_NONE 0
+#define CEPH_ENTITY_ADDR_TYPE_LEGACY __cpu_to_le32(1)
+
+static inline void ceph_encode_banner_addr(struct ceph_entity_addr *a)
{
__be16 ss_family = htons(a->in_addr.ss_family);
a->in_addr.ss_family = *(__u16 *)&ss_family;
+
+ /* Banner addresses require TYPE_NONE */
+ a->type = CEPH_ENTITY_ADDR_TYPE_NONE;
}
-static inline void ceph_decode_addr(struct ceph_entity_addr *a)
+static inline void ceph_decode_banner_addr(struct ceph_entity_addr *a)
{
__be16 ss_family = *(__be16 *)&a->in_addr.ss_family;
a->in_addr.ss_family = ntohs(ss_family);
WARN_ON(a->in_addr.ss_family == 512);
+ a->type = CEPH_ENTITY_ADDR_TYPE_LEGACY;
}
+extern int ceph_decode_entity_addr(void **p, void *end,
+ struct ceph_entity_addr *addr);
/*
* encoders
*/
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 337d5049ff93..82156da3c650 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -84,11 +84,13 @@ struct ceph_options {
#define CEPH_MSG_MAX_MIDDLE_LEN (16*1024*1024)
/*
- * Handle the largest possible rbd object in one message.
+ * The largest possible rbd data object is 32M.
+ * The largest possible rbd object map object is 64M.
+ *
* There is no limit on the size of cephfs objects, but it has to obey
* rsize and wsize mount options anyway.
*/
-#define CEPH_MSG_MAX_DATA_LEN (32*1024*1024)
+#define CEPH_MSG_MAX_DATA_LEN (64*1024*1024)
#define CEPH_AUTH_NAME_DEFAULT "guest"
@@ -299,10 +301,6 @@ int ceph_wait_for_latest_osdmap(struct ceph_client *client,
/* pagevec.c */
extern void ceph_release_page_vector(struct page **pages, int num_pages);
-
-extern struct page **ceph_get_direct_page_vector(const void __user *data,
- int num_pages,
- bool write_page);
extern void ceph_put_page_vector(struct page **pages, int num_pages,
bool dirty);
extern struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags);
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index 3a4688af7455..b4d134d3312a 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -104,7 +104,6 @@ struct ceph_mon_client {
#endif
};
-extern struct ceph_monmap *ceph_monmap_decode(void *p, void *end);
extern int ceph_monmap_contains(struct ceph_monmap *m,
struct ceph_entity_addr *addr);
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 2294f963dab7..ad7fe5d10dcd 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -198,9 +198,9 @@ struct ceph_osd_request {
bool r_mempool;
struct completion r_completion; /* private to osd_client.c */
ceph_osdc_callback_t r_callback;
- struct list_head r_unsafe_item;
struct inode *r_inode; /* for use by callbacks */
+ struct list_head r_private_item; /* ditto */
void *r_priv; /* ditto */
/* set by submitter */
@@ -389,6 +389,14 @@ extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb);
void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err);
+#define osd_req_op_data(oreq, whch, typ, fld) \
+({ \
+ struct ceph_osd_request *__oreq = (oreq); \
+ unsigned int __whch = (whch); \
+ BUG_ON(__whch >= __oreq->r_num_ops); \
+ &__oreq->r_ops[__whch].typ.fld; \
+})
+
extern void osd_req_op_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode, u32 flags);
@@ -497,7 +505,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
const char *class, const char *method,
unsigned int flags,
struct page *req_page, size_t req_len,
- struct page *resp_page, size_t *resp_len);
+ struct page **resp_pages, size_t *resp_len);
extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
struct ceph_vino vino,
diff --git a/include/linux/ceph/striper.h b/include/linux/ceph/striper.h
index cbd0d24b7148..3486636c0e6e 100644
--- a/include/linux/ceph/striper.h
+++ b/include/linux/ceph/striper.h
@@ -66,4 +66,6 @@ int ceph_extent_to_file(struct ceph_file_layout *l,
struct ceph_file_extent **file_extents,
u32 *num_file_extents);
+u64 ceph_get_num_objects(struct ceph_file_layout *l, u64 size);
+
#endif
diff --git a/include/linux/iversion.h b/include/linux/iversion.h
index be50ef7cedab..2917ef990d43 100644
--- a/include/linux/iversion.h
+++ b/include/linux/iversion.h
@@ -113,6 +113,30 @@ inode_peek_iversion_raw(const struct inode *inode)
}
/**
+ * inode_set_max_iversion_raw - update i_version new value is larger
+ * @inode: inode to set
+ * @val: new i_version to set
+ *
+ * Some self-managed filesystems (e.g Ceph) will only update the i_version
+ * value if the new value is larger than the one we already have.
+ */
+static inline void
+inode_set_max_iversion_raw(struct inode *inode, u64 val)
+{
+ u64 cur, old;
+
+ cur = inode_peek_iversion_raw(inode);
+ for (;;) {
+ if (cur > val)
+ break;
+ old = atomic64_cmpxchg(&inode->i_version, cur, val);
+ if (likely(old == cur))
+ break;
+ cur = old;
+ }
+}
+
+/**
* inode_set_iversion - set i_version to a particular value
* @inode: inode to set
* @val: new i_version value to set
diff --git a/net/ceph/Makefile b/net/ceph/Makefile
index db09defe27d0..59d0ba2072de 100644
--- a/net/ceph/Makefile
+++ b/net/ceph/Makefile
@@ -5,7 +5,7 @@
obj-$(CONFIG_CEPH_LIB) += libceph.o
libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
- mon_client.o \
+ mon_client.o decode.o \
cls_lock_client.o \
osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \
striper.o \
diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c
index 4cc28541281b..17447c19d937 100644
--- a/net/ceph/cls_lock_client.c
+++ b/net/ceph/cls_lock_client.c
@@ -6,6 +6,7 @@
#include <linux/ceph/cls_lock_client.h>
#include <linux/ceph/decode.h>
+#include <linux/ceph/libceph.h>
/**
* ceph_cls_lock - grab rados lock for object
@@ -264,8 +265,11 @@ static int decode_locker(void **p, void *end, struct ceph_locker *locker)
return ret;
*p += sizeof(struct ceph_timespec); /* skip expiration */
- ceph_decode_copy(p, &locker->info.addr, sizeof(locker->info.addr));
- ceph_decode_addr(&locker->info.addr);
+
+ ret = ceph_decode_entity_addr(p, end, &locker->info.addr);
+ if (ret)
+ return ret;
+
len = ceph_decode_32(p);
*p += len; /* skip description */
@@ -360,7 +364,7 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc,
dout("%s lock_name %s\n", __func__, lock_name);
ret = ceph_osdc_call(osdc, oid, oloc, "lock", "get_info",
CEPH_OSD_FLAG_READ, get_info_op_page,
- get_info_op_buf_size, reply_page, &reply_len);
+ get_info_op_buf_size, &reply_page, &reply_len);
dout("%s: status %d\n", __func__, ret);
if (ret >= 0) {
@@ -375,3 +379,47 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc,
return ret;
}
EXPORT_SYMBOL(ceph_cls_lock_info);
+
+int ceph_cls_assert_locked(struct ceph_osd_request *req, int which,
+ char *lock_name, u8 type, char *cookie, char *tag)
+{
+ int assert_op_buf_size;
+ int name_len = strlen(lock_name);
+ int cookie_len = strlen(cookie);
+ int tag_len = strlen(tag);
+ struct page **pages;
+ void *p, *end;
+ int ret;
+
+ assert_op_buf_size = name_len + sizeof(__le32) +
+ cookie_len + sizeof(__le32) +
+ tag_len + sizeof(__le32) +
+ sizeof(u8) + CEPH_ENCODING_START_BLK_LEN;
+ if (assert_op_buf_size > PAGE_SIZE)
+ return -E2BIG;
+
+ ret = osd_req_op_cls_init(req, which, "lock", "assert_locked");
+ if (ret)
+ return ret;
+
+ pages = ceph_alloc_page_vector(1, GFP_NOIO);
+ if (IS_ERR(pages))
+ return PTR_ERR(pages);
+
+ p = page_address(pages[0]);
+ end = p + assert_op_buf_size;
+
+ /* encode cls_lock_assert_op struct */
+ ceph_start_encoding(&p, 1, 1,
+ assert_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
+ ceph_encode_string(&p, end, lock_name, name_len);
+ ceph_encode_8(&p, type);
+ ceph_encode_string(&p, end, cookie, cookie_len);
+ ceph_encode_string(&p, end, tag, tag_len);
+ WARN_ON(p != end);
+
+ osd_req_op_cls_request_data_pages(req, which, pages, assert_op_buf_size,
+ 0, false, true);
+ return 0;
+}
+EXPORT_SYMBOL(ceph_cls_assert_locked);
diff --git a/net/ceph/decode.c b/net/ceph/decode.c
new file mode 100644
index 000000000000..eea529595a7a
--- /dev/null
+++ b/net/ceph/decode.c
@@ -0,0 +1,84 @@
+// SPDX-License-Identifier: GPL-2.0
+
+#include <linux/ceph/decode.h>
+
+static int
+ceph_decode_entity_addr_versioned(void **p, void *end,
+ struct ceph_entity_addr *addr)
+{
+ int ret;
+ u8 struct_v;
+ u32 struct_len, addr_len;
+ void *struct_end;
+
+ ret = ceph_start_decoding(p, end, 1, "entity_addr_t", &struct_v,
+ &struct_len);
+ if (ret)
+ goto bad;
+
+ ret = -EINVAL;
+ struct_end = *p + struct_len;
+
+ ceph_decode_copy_safe(p, end, &addr->type, sizeof(addr->type), bad);
+
+ ceph_decode_copy_safe(p, end, &addr->nonce, sizeof(addr->nonce), bad);
+
+ ceph_decode_32_safe(p, end, addr_len, bad);
+ if (addr_len > sizeof(addr->in_addr))
+ goto bad;
+
+ memset(&addr->in_addr, 0, sizeof(addr->in_addr));
+ if (addr_len) {
+ ceph_decode_copy_safe(p, end, &addr->in_addr, addr_len, bad);
+
+ addr->in_addr.ss_family =
+ le16_to_cpu((__force __le16)addr->in_addr.ss_family);
+ }
+
+ /* Advance past anything the client doesn't yet understand */
+ *p = struct_end;
+ ret = 0;
+bad:
+ return ret;
+}
+
+static int
+ceph_decode_entity_addr_legacy(void **p, void *end,
+ struct ceph_entity_addr *addr)
+{
+ int ret = -EINVAL;
+
+ /* Skip rest of type field */
+ ceph_decode_skip_n(p, end, 3, bad);
+
+ /*
+ * Clients that don't support ADDR2 always send TYPE_NONE, change it
+ * to TYPE_LEGACY for forward compatibility.
+ */
+ addr->type = CEPH_ENTITY_ADDR_TYPE_LEGACY;
+ ceph_decode_copy_safe(p, end, &addr->nonce, sizeof(addr->nonce), bad);
+ memset(&addr->in_addr, 0, sizeof(addr->in_addr));
+ ceph_decode_copy_safe(p, end, &addr->in_addr,
+ sizeof(addr->in_addr), bad);
+ addr->in_addr.ss_family =
+ be16_to_cpu((__force __be16)addr->in_addr.ss_family);
+ ret = 0;
+bad:
+ return ret;
+}
+
+int
+ceph_decode_entity_addr(void **p, void *end, struct ceph_entity_addr *addr)
+{
+ u8 marker;
+
+ ceph_decode_8_safe(p, end, marker, bad);
+ if (marker == 1)
+ return ceph_decode_entity_addr_versioned(p, end, addr);
+ else if (marker == 0)
+ return ceph_decode_entity_addr_legacy(p, end, addr);
+bad:
+ return -EINVAL;
+}
+EXPORT_SYMBOL(ceph_decode_entity_addr);
+
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index a33402c99321..962f521c863e 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -199,12 +199,14 @@ const char *ceph_pr_addr(const struct ceph_entity_addr *addr)
switch (ss.ss_family) {
case AF_INET:
- snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%hu", &in4->sin_addr,
+ snprintf(s, MAX_ADDR_STR_LEN, "(%d)%pI4:%hu",
+ le32_to_cpu(addr->type), &in4->sin_addr,
ntohs(in4->sin_port));
break;
case AF_INET6:
- snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%hu", &in6->sin6_addr,
+ snprintf(s, MAX_ADDR_STR_LEN, "(%d)[%pI6c]:%hu",
+ le32_to_cpu(addr->type), &in6->sin6_addr,
ntohs(in6->sin6_port));
break;
@@ -220,7 +222,7 @@ EXPORT_SYMBOL(ceph_pr_addr);
static void encode_my_addr(struct ceph_messenger *msgr)
{
memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr));
- ceph_encode_addr(&msgr->my_enc_addr);
+ ceph_encode_banner_addr(&msgr->my_enc_addr);
}
/*
@@ -1732,12 +1734,14 @@ static int read_partial_banner(struct ceph_connection *con)
ret = read_partial(con, end, size, &con->actual_peer_addr);
if (ret <= 0)
goto out;
+ ceph_decode_banner_addr(&con->actual_peer_addr);
size = sizeof (con->peer_addr_for_me);
end += size;
ret = read_partial(con, end, size, &con->peer_addr_for_me);
if (ret <= 0)
goto out;
+ ceph_decode_banner_addr(&con->peer_addr_for_me);
out:
return ret;
@@ -1981,6 +1985,7 @@ int ceph_parse_ips(const char *c, const char *end,
}
addr_set_port(&addr[i], port);
+ addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY;
dout("parse_ips got %s\n", ceph_pr_addr(&addr[i]));
@@ -2011,9 +2016,6 @@ static int process_banner(struct ceph_connection *con)
if (verify_hello(con) < 0)
return -1;
- ceph_decode_addr(&con->actual_peer_addr);
- ceph_decode_addr(&con->peer_addr_for_me);
-
/*
* Make sure the other end is who we wanted. note that the other
* end may not yet know their ip address, so if it's 0.0.0.0, give
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 895679d3529b..0520bf9825aa 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -39,7 +39,7 @@ static int __validate_auth(struct ceph_mon_client *monc);
/*
* Decode a monmap blob (e.g., during mount).
*/
-struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
+static struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
{
struct ceph_monmap *m = NULL;
int i, err = -EINVAL;
@@ -50,7 +50,7 @@ struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
ceph_decode_32_safe(&p, end, len, bad);
ceph_decode_need(&p, end, len, bad);
- dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
+ dout("monmap_decode %p %p len %d (%d)\n", p, end, len, (int)(end-p));
p += sizeof(u16); /* skip version */
ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
@@ -58,7 +58,6 @@ struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
epoch = ceph_decode_32(&p);
num_mon = ceph_decode_32(&p);
- ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
if (num_mon > CEPH_MAX_MON)
goto bad;
@@ -68,17 +67,22 @@ struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
m->fsid = fsid;
m->epoch = epoch;
m->num_mon = num_mon;
- ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
- for (i = 0; i < num_mon; i++)
- ceph_decode_addr(&m->mon_inst[i].addr);
-
+ for (i = 0; i < num_mon; ++i) {
+ struct ceph_entity_inst *inst = &m->mon_inst[i];
+
+ /* copy name portion */
+ ceph_decode_copy_safe(&p, end, &inst->name,
+ sizeof(inst->name), bad);
+ err = ceph_decode_entity_addr(&p, end, &inst->addr);
+ if (err)
+ goto bad;
+ }
dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
m->num_mon);
for (i = 0; i < m->num_mon; i++)
dout("monmap_decode mon%d is %s\n", i,
ceph_pr_addr(&m->mon_inst[i].addr));
return m;
-
bad:
dout("monmap_decode failed with %d\n", err);
kfree(m);
@@ -469,6 +473,7 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
if (IS_ERR(monmap)) {
pr_err("problem decoding monmap, %d\n",
(int)PTR_ERR(monmap));
+ ceph_msg_dump(msg);
goto out;
}
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 9a8eca5eda65..0b2df09b2554 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -171,14 +171,6 @@ static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
osd_data->num_bvecs = num_bvecs;
}
-#define osd_req_op_data(oreq, whch, typ, fld) \
-({ \
- struct ceph_osd_request *__oreq = (oreq); \
- unsigned int __whch = (whch); \
- BUG_ON(__whch >= __oreq->r_num_ops); \
- &__oreq->r_ops[__whch].typ.fld; \
-})
-
static struct ceph_osd_data *
osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
{
@@ -478,7 +470,7 @@ static void request_release_checks(struct ceph_osd_request *req)
{
WARN_ON(!RB_EMPTY_NODE(&req->r_node));
WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
- WARN_ON(!list_empty(&req->r_unsafe_item));
+ WARN_ON(!list_empty(&req->r_private_item));
WARN_ON(req->r_osd);
}
@@ -538,7 +530,7 @@ static void request_init(struct ceph_osd_request *req)
init_completion(&req->r_completion);
RB_CLEAR_NODE(&req->r_node);
RB_CLEAR_NODE(&req->r_mc_node);
- INIT_LIST_HEAD(&req->r_unsafe_item);
+ INIT_LIST_HEAD(&req->r_private_item);
target_init(&req->r_t);
}
@@ -4914,20 +4906,26 @@ static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
ret = ceph_start_decoding(p, end, 2, "watch_item_t",
&struct_v, &struct_len);
if (ret)
- return ret;
+ goto bad;
+
+ ret = -EINVAL;
+ ceph_decode_copy_safe(p, end, &item->name, sizeof(item->name), bad);
+ ceph_decode_64_safe(p, end, item->cookie, bad);
+ ceph_decode_skip_32(p, end, bad); /* skip timeout seconds */
- ceph_decode_copy(p, &item->name, sizeof(item->name));
- item->cookie = ceph_decode_64(p);
- *p += 4; /* skip timeout_seconds */
if (struct_v >= 2) {
- ceph_decode_copy(p, &item->addr, sizeof(item->addr));
- ceph_decode_addr(&item->addr);
+ ret = ceph_decode_entity_addr(p, end, &item->addr);
+ if (ret)
+ goto bad;
+ } else {
+ ret = 0;
}
dout("%s %s%llu cookie %llu addr %s\n", __func__,
ENTITY_NAME(item->name), item->cookie,
ceph_pr_addr(&item->addr));
- return 0;
+bad:
+ return ret;
}
static int decode_watchers(void **p, void *end,
@@ -5044,12 +5042,12 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
const char *class, const char *method,
unsigned int flags,
struct page *req_page, size_t req_len,
- struct page *resp_page, size_t *resp_len)
+ struct page **resp_pages, size_t *resp_len)
{
struct ceph_osd_request *req;
int ret;
- if (req_len > PAGE_SIZE || (resp_page && *resp_len > PAGE_SIZE))
+ if (req_len > PAGE_SIZE)
return -E2BIG;
req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
@@ -5067,8 +5065,8 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
if (req_page)
osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
0, false, false);
- if (resp_page)
- osd_req_op_cls_response_data_pages(req, 0, &resp_page,
+ if (resp_pages)
+ osd_req_op_cls_response_data_pages(req, 0, resp_pages,
*resp_len, 0, false, false);
ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
@@ -5079,7 +5077,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
ret = ceph_osdc_wait_request(osdc, req);
if (ret >= 0) {
ret = req->r_ops[0].rval;
- if (resp_page)
+ if (resp_pages)
*resp_len = req->r_ops[0].outdata_len;
}
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 48a31dc9161c..90437906b7bc 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -1489,11 +1489,9 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
/* osd_state, osd_weight, osd_addrs->client_addr */
ceph_decode_need(p, end, 3*sizeof(u32) +
- map->max_osd*((struct_v >= 5 ? sizeof(u32) :
- sizeof(u8)) +
- sizeof(*map->osd_weight) +
- sizeof(*map->osd_addr)), e_inval);
-
+ map->max_osd*(struct_v >= 5 ? sizeof(u32) :
+ sizeof(u8)) +
+ sizeof(*map->osd_weight), e_inval);
if (ceph_decode_32(p) != map->max_osd)
goto e_inval;
@@ -1514,9 +1512,11 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
if (ceph_decode_32(p) != map->max_osd)
goto e_inval;
- ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr));
- for (i = 0; i < map->max_osd; i++)
- ceph_decode_addr(&map->osd_addr[i]);
+ for (i = 0; i < map->max_osd; i++) {
+ err = ceph_decode_entity_addr(p, end, &map->osd_addr[i]);
+ if (err)
+ goto bad;
+ }
/* pg_temp */
err = decode_pg_temp(p, end, map);
@@ -1618,12 +1618,17 @@ static int decode_new_up_state_weight(void **p, void *end, u8 struct_v,
void *new_state;
void *new_weight_end;
u32 len;
+ int i;
new_up_client = *p;
ceph_decode_32_safe(p, end, len, e_inval);
- len *= sizeof(u32) + sizeof(struct ceph_entity_addr);
- ceph_decode_need(p, end, len, e_inval);
- *p += len;
+ for (i = 0; i < len; ++i) {
+ struct ceph_entity_addr addr;
+
+ ceph_decode_skip_32(p, end, e_inval);
+ if (ceph_decode_entity_addr(p, end, &addr))
+ goto e_inval;
+ }
new_state = *p;
ceph_decode_32_safe(p, end, len, e_inval);
@@ -1699,9 +1704,9 @@ static int decode_new_up_state_weight(void **p, void *end, u8 struct_v,
struct ceph_entity_addr addr;
osd = ceph_decode_32(p);
- ceph_decode_copy(p, &addr, sizeof(addr));
- ceph_decode_addr(&addr);
BUG_ON(osd >= map->max_osd);
+ if (ceph_decode_entity_addr(p, end, &addr))
+ goto e_inval;
pr_info("osd%d up\n", osd);
map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
map->osd_addr[osd] = addr;
diff --git a/net/ceph/pagevec.c b/net/ceph/pagevec.c
index 74cafc0142ea..64305e7056a1 100644
--- a/net/ceph/pagevec.c
+++ b/net/ceph/pagevec.c
@@ -10,39 +10,6 @@
#include <linux/ceph/libceph.h>
-/*
- * build a vector of user pages
- */
-struct page **ceph_get_direct_page_vector(const void __user *data,
- int num_pages, bool write_page)
-{
- struct page **pages;
- int got = 0;
- int rc = 0;
-
- pages = kmalloc_array(num_pages, sizeof(*pages), GFP_NOFS);
- if (!pages)
- return ERR_PTR(-ENOMEM);
-
- while (got < num_pages) {
- rc = get_user_pages_fast(
- (unsigned long)data + ((unsigned long)got * PAGE_SIZE),
- num_pages - got, write_page ? FOLL_WRITE : 0, pages + got);
- if (rc < 0)
- break;
- BUG_ON(rc == 0);
- got += rc;
- }
- if (rc < 0)
- goto fail;
- return pages;
-
-fail:
- ceph_put_page_vector(pages, got, false);
- return ERR_PTR(rc);
-}
-EXPORT_SYMBOL(ceph_get_direct_page_vector);
-
void ceph_put_page_vector(struct page **pages, int num_pages, bool dirty)
{
int i;
diff --git a/net/ceph/striper.c b/net/ceph/striper.c
index c36462dc86b7..3b3fa75d1189 100644
--- a/net/ceph/striper.c
+++ b/net/ceph/striper.c
@@ -259,3 +259,20 @@ int ceph_extent_to_file(struct ceph_file_layout *l,
return 0;
}
EXPORT_SYMBOL(ceph_extent_to_file);
+
+u64 ceph_get_num_objects(struct ceph_file_layout *l, u64 size)
+{
+ u64 period = (u64)l->stripe_count * l->object_size;
+ u64 num_periods = DIV64_U64_ROUND_UP(size, period);
+ u64 remainder_bytes;
+ u64 remainder_objs = 0;
+
+ div64_u64_rem(size, period, &remainder_bytes);
+ if (remainder_bytes > 0 &&
+ remainder_bytes < (u64)l->stripe_count * l->stripe_unit)
+ remainder_objs = l->stripe_count -
+ DIV_ROUND_UP_ULL(remainder_bytes, l->stripe_unit);
+
+ return num_periods * l->stripe_count - remainder_objs;
+}
+EXPORT_SYMBOL(ceph_get_num_objects);