dlm: use rsbtbl as resource directory
[firefly-linux-kernel-4.4.55.git] / fs / dlm / recover.c
index 7554e4dac6bbbc0290ac26a1a75438a1551d5060..3c025fe49ad3c0846455aeb77863d990f071e74a 100644 (file)
@@ -361,9 +361,8 @@ static void set_master_lkbs(struct dlm_rsb *r)
  * rsb's to consider.
  */
 
-static void set_new_master(struct dlm_rsb *r, int nodeid)
+static void set_new_master(struct dlm_rsb *r)
 {
-       r->res_nodeid = nodeid;
        set_master_lkbs(r);
        rsb_set_flag(r, RSB_NEW_MASTER);
        rsb_set_flag(r, RSB_NEW_MASTER2);
@@ -372,31 +371,48 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
 /*
  * We do async lookups on rsb's that need new masters.  The rsb's
  * waiting for a lookup reply are kept on the recover_list.
+ *
+ * Another node recovering the master may have sent us a rcom lookup,
+ * and our dlm_master_lookup() set it as the new master, along with
+ * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
+ * equals our_nodeid below).
  */
 
-static int recover_master(struct dlm_rsb *r)
+static int recover_master(struct dlm_rsb *r, unsigned int *count)
 {
        struct dlm_ls *ls = r->res_ls;
-       int error, ret_nodeid;
-       int our_nodeid = dlm_our_nodeid();
-       int dir_nodeid = dlm_dir_nodeid(r);
+       int our_nodeid, dir_nodeid;
+       int is_removed = 0;
+       int error;
+
+       if (is_master(r))
+               return 0;
+
+       is_removed = dlm_is_removed(ls, r->res_nodeid);
+
+       if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
+               return 0;
+
+       our_nodeid = dlm_our_nodeid();
+       dir_nodeid = dlm_dir_nodeid(r);
 
        if (dir_nodeid == our_nodeid) {
-               error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
-                                      r->res_length, &ret_nodeid);
-               if (error)
-                       log_error(ls, "recover dir lookup error %d", error);
+               if (is_removed) {
+                       r->res_master_nodeid = our_nodeid;
+                       r->res_nodeid = 0;
+               }
 
-               if (ret_nodeid == our_nodeid)
-                       ret_nodeid = 0;
-               lock_rsb(r);
-               set_new_master(r, ret_nodeid);
-               unlock_rsb(r);
+               /* set master of lkbs to ourself when is_removed, or to
+                  another new master which we set along with NEW_MASTER
+                  in dlm_master_lookup */
+               set_new_master(r);
+               error = 0;
        } else {
                recover_list_add(r);
                error = dlm_send_rcom_lookup(r, dir_nodeid);
        }
 
+       (*count)++;
        return error;
 }
 
@@ -415,7 +431,7 @@ static int recover_master(struct dlm_rsb *r)
  * resent.
  */
 
-static int recover_master_static(struct dlm_rsb *r)
+static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
 {
        int dir_nodeid = dlm_dir_nodeid(r);
        int new_master = dir_nodeid;
@@ -423,11 +439,12 @@ static int recover_master_static(struct dlm_rsb *r)
        if (dir_nodeid == dlm_our_nodeid())
                new_master = 0;
 
-       lock_rsb(r);
        dlm_purge_mstcpy_locks(r);
-       set_new_master(r, new_master);
-       unlock_rsb(r);
-       return 1;
+       r->res_master_nodeid = dir_nodeid;
+       r->res_nodeid = new_master;
+       set_new_master(r);
+       (*count)++;
+       return 0;
 }
 
 /*
@@ -443,7 +460,10 @@ static int recover_master_static(struct dlm_rsb *r)
 int dlm_recover_masters(struct dlm_ls *ls)
 {
        struct dlm_rsb *r;
-       int error = 0, count = 0;
+       unsigned int total = 0;
+       unsigned int count = 0;
+       int nodir = dlm_no_directory(ls);
+       int error;
 
        log_debug(ls, "dlm_recover_masters");
 
@@ -455,20 +475,23 @@ int dlm_recover_masters(struct dlm_ls *ls)
                        goto out;
                }
 
-               if (dlm_no_directory(ls))
-                       count += recover_master_static(r);
-               else if (!is_master(r) &&
-                        (dlm_is_removed(ls, r->res_nodeid) ||
-                         rsb_flag(r, RSB_NEW_MASTER))) {
-                       recover_master(r);
-                       count++;
-               }
+               lock_rsb(r);
+               if (nodir)
+                       error = recover_master_static(r, &count);
+               else
+                       error = recover_master(r, &count);
+               unlock_rsb(r);
+               cond_resched();
+               total++;
 
-               schedule();
+               if (error) {
+                       up_read(&ls->ls_root_sem);
+                       goto out;
+               }
        }
        up_read(&ls->ls_root_sem);
 
-       log_debug(ls, "dlm_recover_masters %d resources", count);
+       log_debug(ls, "dlm_recover_masters %u of %u", count, total);
 
        error = dlm_wait_function(ls, &recover_list_empty);
  out:
@@ -480,7 +503,7 @@ int dlm_recover_masters(struct dlm_ls *ls)
 int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
 {
        struct dlm_rsb *r;
-       int nodeid;
+       int ret_nodeid, new_master;
 
        r = recover_list_find(ls, rc->rc_id);
        if (!r) {
@@ -489,12 +512,17 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
                goto out;
        }
 
-       nodeid = rc->rc_result;
-       if (nodeid == dlm_our_nodeid())
-               nodeid = 0;
+       ret_nodeid = rc->rc_result;
+
+       if (ret_nodeid == dlm_our_nodeid())
+               new_master = 0;
+       else
+               new_master = ret_nodeid;
 
        lock_rsb(r);
-       set_new_master(r, nodeid);
+       r->res_master_nodeid = ret_nodeid;
+       r->res_nodeid = new_master;
+       set_new_master(r);
        unlock_rsb(r);
        recover_list_del(r);
 
@@ -791,20 +819,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
                        dlm_hold_rsb(r);
                }
 
-               /* If we're using a directory, add tossed rsbs to the root
-                  list; they'll have entries created in the new directory,
-                  but no other recovery steps should do anything with them. */
-
-               if (dlm_no_directory(ls)) {
-                       spin_unlock(&ls->ls_rsbtbl[i].lock);
-                       continue;
-               }
-
-               for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) {
-                       r = rb_entry(n, struct dlm_rsb, res_hashnode);
-                       list_add(&r->res_root_list, &ls->ls_root_list);
-                       dlm_hold_rsb(r);
-               }
+               if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
+                       log_error(ls, "dlm_create_root_list toss not empty");
                spin_unlock(&ls->ls_rsbtbl[i].lock);
        }
  out:
@@ -824,28 +840,26 @@ void dlm_release_root_list(struct dlm_ls *ls)
        up_write(&ls->ls_root_sem);
 }
 
-/* If not using a directory, clear the entire toss list, there's no benefit to
-   caching the master value since it's fixed.  If we are using a dir, keep the
-   rsb's we're the master of.  Recovery will add them to the root list and from
-   there they'll be entered in the rebuilt directory. */
-
-void dlm_clear_toss_list(struct dlm_ls *ls)
+void dlm_clear_toss(struct dlm_ls *ls)
 {
        struct rb_node *n, *next;
-       struct dlm_rsb *rsb;
+       struct dlm_rsb *r;
+       unsigned int count = 0;
        int i;
 
        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
                spin_lock(&ls->ls_rsbtbl[i].lock);
                for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
-                       next = rb_next(n);;
-                       rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
-                       if (dlm_no_directory(ls) || !is_master(rsb)) {
-                               rb_erase(n, &ls->ls_rsbtbl[i].toss);
-                               dlm_free_rsb(rsb);
-                       }
+                       next = rb_next(n);
+                       r = rb_entry(n, struct dlm_rsb, res_hashnode);
+                       rb_erase(n, &ls->ls_rsbtbl[i].toss);
+                       dlm_free_rsb(r);
+                       count++;
                }
                spin_unlock(&ls->ls_rsbtbl[i].lock);
        }
+
+       if (count)
+               log_debug(ls, "dlm_clear_toss %u done", count);
 }