summaryrefslogtreecommitdiff
path: root/fs/dlm/user.c
diff options
context:
space:
mode:
authorAlexander Aring <aahringo@redhat.com>2024-03-28 18:48:41 +0300
committerDavid Teigland <teigland@redhat.com>2024-04-01 21:31:12 +0300
commit986ae3c2a8dfc1e229cabe9cc2e0b01b721c8980 (patch)
tree6c036bb2a5ba03325b2c6c4ecca07d74c85bf958 /fs/dlm/user.c
parent0175e51b5134b55c89364aae68ec16271c67e472 (diff)
downloadlinux-986ae3c2a8dfc1e229cabe9cc2e0b01b721c8980.tar.xz
dlm: fix race between final callback and remove
This patch fixes the following issue: node 1 is dir node 2 is master node 3 is other 1->2: unlock 2: put final lkb, rsb moved to toss 2->1: unlock_reply 1: queue lkb callback with EUNLOCK 2->1: remove 1: receive_remove ignored (rsb on keep because of queued lkb callback) 1: complete lkb callback, put_lkb, move rsb to toss 3->1: lookup 1->3: lookup_reply master=2 3->2: request 2->3: request_reply EBADR In summary: An unexpected lkb reference causes the rsb to remain on the wrong list. The rsb being on the wrong list causes receive_remove to be ignored. An ignored receive_remove causes inconsistent dir and master state. This sequence requires an unusually long delay in delivering the unlock callback, because the remove message from 2->1 usually happens after some seconds. So, it's not known exactly how frequently this sequence occurs in pratice. It's possible that the same end result could also have another unknown cause. The solution for this issue is to further separate callback state from the lkb, so that an lkb reference (and from that, an rsb ref) are not held while a callback remains queued. Then, within the unlock_reply, the lkb will be freed and the rsb moved to the toss list. So, the receive_remove will not be ignored. Signed-off-by: Alexander Aring <aahringo@redhat.com> Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs/dlm/user.c')
-rw-r--r--fs/dlm/user.c86
1 files changed, 21 insertions, 65 deletions
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index fa99b6074e5c..334a6d64d413 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -21,6 +21,7 @@
#include "dlm_internal.h"
#include "lockspace.h"
#include "lock.h"
+#include "lvb_table.h"
#include "user.h"
#include "ast.h"
#include "config.h"
@@ -144,24 +145,6 @@ static void compat_output(struct dlm_lock_result *res,
}
#endif
-/* should held proc->asts_spin lock */
-void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb)
-{
- struct dlm_callback *cb, *safe;
-
- list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) {
- list_del(&cb->list);
- kref_put(&cb->ref, dlm_release_callback);
- }
-
- clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
-
- /* invalidate */
- dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL);
- dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL);
- lkb->lkb_last_bast_mode = -1;
-}
-
/* Figure out if this lock is at the end of its life and no longer
available for the application to use. The lkb still exists until
the final ast is read. A lock becomes EOL in three situations:
@@ -198,6 +181,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
struct dlm_ls *ls;
struct dlm_user_args *ua;
struct dlm_user_proc *proc;
+ struct dlm_callback *cb;
int rv;
if (test_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags) ||
@@ -229,11 +213,18 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
spin_lock(&proc->asts_spin);
- rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
+ rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags, &cb);
switch (rv) {
case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
- kref_get(&lkb->lkb_ref);
- list_add_tail(&lkb->lkb_cb_list, &proc->asts);
+ cb->ua = *ua;
+ cb->lkb_lksb = &cb->ua.lksb;
+ if (cb->copy_lvb) {
+ memcpy(cb->lvbptr, ua->lksb.sb_lvbptr,
+ DLM_USER_LVB_LEN);
+ cb->lkb_lksb->sb_lvbptr = cb->lvbptr;
+ }
+
+ list_add_tail(&cb->list, &proc->asts);
wake_up_interruptible(&proc->wait);
break;
case DLM_ENQUEUE_CALLBACK_SUCCESS:
@@ -801,10 +792,8 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
loff_t *ppos)
{
struct dlm_user_proc *proc = file->private_data;
- struct dlm_lkb *lkb;
DECLARE_WAITQUEUE(wait, current);
struct dlm_callback *cb;
- struct dlm_rsb *rsb;
int rv, ret;
if (count == sizeof(struct dlm_device_version)) {
@@ -824,8 +813,6 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
#endif
return -EINVAL;
- try_another:
-
/* do we really need this? can a read happen after a close? */
if (test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags))
return -EINVAL;
@@ -860,55 +847,24 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
without removing lkb_cb_list; so empty lkb_cb_list is always
consistent with empty lkb_callbacks */
- lkb = list_first_entry(&proc->asts, struct dlm_lkb, lkb_cb_list);
-
- rv = dlm_dequeue_lkb_callback(lkb, &cb);
- switch (rv) {
- case DLM_DEQUEUE_CALLBACK_EMPTY:
- /* this shouldn't happen; lkb should have been removed from
- * list when last item was dequeued
- */
- log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id);
- list_del_init(&lkb->lkb_cb_list);
- spin_unlock(&proc->asts_spin);
- /* removes ref for proc->asts, may cause lkb to be freed */
- dlm_put_lkb(lkb);
- WARN_ON_ONCE(1);
- goto try_another;
- case DLM_DEQUEUE_CALLBACK_LAST:
- list_del_init(&lkb->lkb_cb_list);
- clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
- break;
- case DLM_DEQUEUE_CALLBACK_SUCCESS:
- break;
- default:
- WARN_ON_ONCE(1);
- break;
- }
+ cb = list_first_entry(&proc->asts, struct dlm_callback, list);
+ list_del(&cb->list);
spin_unlock(&proc->asts_spin);
- rsb = lkb->lkb_resource;
if (cb->flags & DLM_CB_BAST) {
- trace_dlm_bast(rsb->res_ls->ls_global_id, lkb->lkb_id,
- cb->mode, rsb->res_name, rsb->res_length);
+ trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name,
+ cb->res_length);
} else if (cb->flags & DLM_CB_CAST) {
- lkb->lkb_lksb->sb_status = cb->sb_status;
- lkb->lkb_lksb->sb_flags = cb->sb_flags;
- trace_dlm_ast(rsb->res_ls->ls_global_id, lkb->lkb_id,
- cb->sb_flags, cb->sb_status, rsb->res_name,
- rsb->res_length);
+ cb->lkb_lksb->sb_status = cb->sb_status;
+ cb->lkb_lksb->sb_flags = cb->sb_flags;
+ trace_dlm_ast(cb->ls_id, cb->lkb_id, cb->sb_status,
+ cb->sb_flags, cb->res_name, cb->res_length);
}
- ret = copy_result_to_user(lkb->lkb_ua,
+ ret = copy_result_to_user(&cb->ua,
test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags),
cb->flags, cb->mode, cb->copy_lvb, buf, count);
-
kref_put(&cb->ref, dlm_release_callback);
-
- /* removes ref for proc->asts, may cause lkb to be freed */
- if (rv == DLM_DEQUEUE_CALLBACK_LAST)
- dlm_put_lkb(lkb);
-
return ret;
}