summaryrefslogtreecommitdiff
path: root/fs/dlm/dir.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/dir.c')
-rw-r--r--fs/dlm/dir.c157
1 files changed, 125 insertions, 32 deletions
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index f6acba4310a7..b1ab0adbd9d0 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -47,15 +47,13 @@ int dlm_dir_nodeid(struct dlm_rsb *r)
return r->res_dir_nodeid;
}
-void dlm_recover_dir_nodeid(struct dlm_ls *ls)
+void dlm_recover_dir_nodeid(struct dlm_ls *ls, const struct list_head *root_list)
{
struct dlm_rsb *r;
- down_read(&ls->ls_root_sem);
- list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+ list_for_each_entry(r, root_list, res_root_list) {
r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
}
- up_read(&ls->ls_root_sem);
}
int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
@@ -200,35 +198,98 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
int len)
{
struct dlm_rsb *r;
- uint32_t hash, bucket;
int rv;
- hash = jhash(name, len, 0);
- bucket = hash & (ls->ls_rsbtbl_size - 1);
-
- spin_lock(&ls->ls_rsbtbl[bucket].lock);
- rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
- if (rv)
- rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
- name, len, &r);
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
-
+ read_lock_bh(&ls->ls_rsbtbl_lock);
+ rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
if (!rv)
return r;
- down_read(&ls->ls_root_sem);
- list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+ list_for_each_entry(r, &ls->ls_masters_list, res_masters_list) {
if (len == r->res_length && !memcmp(name, r->res_name, len)) {
- up_read(&ls->ls_root_sem);
log_debug(ls, "find_rsb_root revert to root_list %s",
r->res_name);
return r;
}
}
- up_read(&ls->ls_root_sem);
return NULL;
}
+struct dlm_dir_dump {
+ /* init values to match if whole
+ * dump fits to one seq. Sanity check only.
+ */
+ uint64_t seq_init;
+ uint64_t nodeid_init;
+ /* compare local pointer with last lookup,
+ * just a sanity check.
+ */
+ struct list_head *last;
+
+ unsigned int sent_res; /* for log info */
+ unsigned int sent_msg; /* for log info */
+
+ struct list_head list;
+};
+
+static void drop_dir_ctx(struct dlm_ls *ls, int nodeid)
+{
+ struct dlm_dir_dump *dd, *safe;
+
+ write_lock_bh(&ls->ls_dir_dump_lock);
+ list_for_each_entry_safe(dd, safe, &ls->ls_dir_dump_list, list) {
+ if (dd->nodeid_init == nodeid) {
+ log_error(ls, "drop dump seq %llu",
+ (unsigned long long)dd->seq_init);
+ list_del(&dd->list);
+ kfree(dd);
+ }
+ }
+ write_unlock_bh(&ls->ls_dir_dump_lock);
+}
+
+static struct dlm_dir_dump *lookup_dir_dump(struct dlm_ls *ls, int nodeid)
+{
+ struct dlm_dir_dump *iter, *dd = NULL;
+
+ read_lock_bh(&ls->ls_dir_dump_lock);
+ list_for_each_entry(iter, &ls->ls_dir_dump_list, list) {
+ if (iter->nodeid_init == nodeid) {
+ dd = iter;
+ break;
+ }
+ }
+ read_unlock_bh(&ls->ls_dir_dump_lock);
+
+ return dd;
+}
+
+static struct dlm_dir_dump *init_dir_dump(struct dlm_ls *ls, int nodeid)
+{
+ struct dlm_dir_dump *dd;
+
+ dd = lookup_dir_dump(ls, nodeid);
+ if (dd) {
+ log_error(ls, "found ongoing dir dump for node %d, will drop it",
+ nodeid);
+ drop_dir_ctx(ls, nodeid);
+ }
+
+ dd = kzalloc(sizeof(*dd), GFP_ATOMIC);
+ if (!dd)
+ return NULL;
+
+ dd->seq_init = ls->ls_recover_seq;
+ dd->nodeid_init = nodeid;
+
+ write_lock_bh(&ls->ls_dir_dump_lock);
+ list_add(&dd->list, &ls->ls_dir_dump_list);
+ write_unlock_bh(&ls->ls_dir_dump_lock);
+
+ return dd;
+}
+
/* Find the rsb where we left off (or start again), then send rsb names
for rsb's we're master of and whose directory node matches the requesting
node. inbuf is the rsb name last sent, inlen is the name's length */
@@ -239,27 +300,50 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
struct list_head *list;
struct dlm_rsb *r;
int offset = 0, dir_nodeid;
+ struct dlm_dir_dump *dd;
__be16 be_namelen;
- down_read(&ls->ls_root_sem);
+ read_lock_bh(&ls->ls_masters_lock);
if (inlen > 1) {
+ dd = lookup_dir_dump(ls, nodeid);
+ if (!dd) {
+ log_error(ls, "failed to lookup dir dump context nodeid: %d",
+ nodeid);
+ goto out;
+ }
+
+ /* next chunk in dump */
r = find_rsb_root(ls, inbuf, inlen);
if (!r) {
log_error(ls, "copy_master_names from %d start %d %.*s",
nodeid, inlen, inlen, inbuf);
goto out;
}
- list = r->res_root_list.next;
+ list = r->res_masters_list.next;
+
+ /* sanity checks */
+ if (dd->last != &r->res_masters_list ||
+ dd->seq_init != ls->ls_recover_seq) {
+ log_error(ls, "failed dir dump sanity check seq_init: %llu seq: %llu",
+ (unsigned long long)dd->seq_init,
+ (unsigned long long)ls->ls_recover_seq);
+ goto out;
+ }
} else {
- list = ls->ls_root_list.next;
- }
+ dd = init_dir_dump(ls, nodeid);
+ if (!dd) {
+ log_error(ls, "failed to allocate dir dump context");
+ goto out;
+ }
- for (offset = 0; list != &ls->ls_root_list; list = list->next) {
- r = list_entry(list, struct dlm_rsb, res_root_list);
- if (r->res_nodeid)
- continue;
+ /* start dump */
+ list = ls->ls_masters_list.next;
+ dd->last = list;
+ }
+ for (offset = 0; list != &ls->ls_masters_list; list = list->next) {
+ r = list_entry(list, struct dlm_rsb, res_masters_list);
dir_nodeid = dlm_dir_nodeid(r);
if (dir_nodeid != nodeid)
continue;
@@ -277,7 +361,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
be_namelen = cpu_to_be16(0);
memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
offset += sizeof(__be16);
- ls->ls_recover_dir_sent_msg++;
+ dd->sent_msg++;
goto out;
}
@@ -286,7 +370,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
offset += sizeof(__be16);
memcpy(outbuf + offset, r->res_name, r->res_length);
offset += r->res_length;
- ls->ls_recover_dir_sent_res++;
+ dd->sent_res++;
+ dd->last = list;
}
/*
@@ -294,14 +379,22 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
* terminating record.
*/
- if ((list == &ls->ls_root_list) &&
+ if ((list == &ls->ls_masters_list) &&
(offset + sizeof(uint16_t) <= outlen)) {
+ /* end dump */
be_namelen = cpu_to_be16(0xFFFF);
memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
offset += sizeof(__be16);
- ls->ls_recover_dir_sent_msg++;
+ dd->sent_msg++;
+ log_rinfo(ls, "dlm_recover_directory nodeid %d sent %u res out %u messages",
+ nodeid, dd->sent_res, dd->sent_msg);
+
+ write_lock_bh(&ls->ls_dir_dump_lock);
+ list_del_init(&dd->list);
+ write_unlock_bh(&ls->ls_dir_dump_lock);
+ kfree(dd);
}
out:
- up_read(&ls->ls_root_sem);
+ read_unlock_bh(&ls->ls_masters_lock);
}