Merge branch 'stable/for-linus-3.6' into linux-next
[firefly-linux-kernel-4.4.55.git] / fs / dlm / recover.c
index 7554e4dac6bbbc0290ac26a1a75438a1551d5060..4a7a76e42fc365e603e5ec9af26089ef9fe7d727 100644 (file)
  * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes).  When another
  * function thinks it could have completed the waited-on task, they should wake
  * up ls_wait_general to get an immediate response rather than waiting for the
- * timer to detect the result.  A timer wakes us up periodically while waiting
- * to see if we should abort due to a node failure.  This should only be called
- * by the dlm_recoverd thread.
+ * timeout.  This uses a timeout so it can check periodically if the wait
+ * should abort due to node failure (which doesn't cause a wake_up).
+ * This should only be called by the dlm_recoverd thread.
  */
 
-static void dlm_wait_timer_fn(unsigned long data)
-{
-       struct dlm_ls *ls = (struct dlm_ls *) data;
-       mod_timer(&ls->ls_timer, jiffies + (dlm_config.ci_recover_timer * HZ));
-       wake_up(&ls->ls_wait_general);
-}
-
 int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
 {
        int error = 0;
+       int rv;
 
-       init_timer(&ls->ls_timer);
-       ls->ls_timer.function = dlm_wait_timer_fn;
-       ls->ls_timer.data = (long) ls;
-       ls->ls_timer.expires = jiffies + (dlm_config.ci_recover_timer * HZ);
-       add_timer(&ls->ls_timer);
-
-       wait_event(ls->ls_wait_general, testfn(ls) || dlm_recovery_stopped(ls));
-       del_timer_sync(&ls->ls_timer);
+       while (1) {
+               rv = wait_event_timeout(ls->ls_wait_general,
+                                       testfn(ls) || dlm_recovery_stopped(ls),
+                                       dlm_config.ci_recover_timer * HZ);
+               if (rv)
+                       break;
+       }
 
        if (dlm_recovery_stopped(ls)) {
                log_debug(ls, "dlm_wait_function aborted");
@@ -277,22 +270,6 @@ static void recover_list_del(struct dlm_rsb *r)
        dlm_put_rsb(r);
 }
 
-static struct dlm_rsb *recover_list_find(struct dlm_ls *ls, uint64_t id)
-{
-       struct dlm_rsb *r = NULL;
-
-       spin_lock(&ls->ls_recover_list_lock);
-
-       list_for_each_entry(r, &ls->ls_recover_list, res_recover_list) {
-               if (id == (unsigned long) r)
-                       goto out;
-       }
-       r = NULL;
- out:
-       spin_unlock(&ls->ls_recover_list_lock);
-       return r;
-}
-
 static void recover_list_clear(struct dlm_ls *ls)
 {
        struct dlm_rsb *r, *s;
@@ -313,6 +290,94 @@ static void recover_list_clear(struct dlm_ls *ls)
        spin_unlock(&ls->ls_recover_list_lock);
 }
 
+static int recover_idr_empty(struct dlm_ls *ls)
+{
+       int empty = 1;
+
+       spin_lock(&ls->ls_recover_idr_lock);
+       if (ls->ls_recover_list_count)
+               empty = 0;
+       spin_unlock(&ls->ls_recover_idr_lock);
+
+       return empty;
+}
+
+static int recover_idr_add(struct dlm_rsb *r)
+{
+       struct dlm_ls *ls = r->res_ls;
+       int rv, id;
+
+       rv = idr_pre_get(&ls->ls_recover_idr, GFP_NOFS);
+       if (!rv)
+               return -ENOMEM;
+
+       spin_lock(&ls->ls_recover_idr_lock);
+       if (r->res_id) {
+               spin_unlock(&ls->ls_recover_idr_lock);
+               return -1;
+       }
+       rv = idr_get_new_above(&ls->ls_recover_idr, r, 1, &id);
+       if (rv) {
+               spin_unlock(&ls->ls_recover_idr_lock);
+               return rv;
+       }
+       r->res_id = id;
+       ls->ls_recover_list_count++;
+       dlm_hold_rsb(r);
+       spin_unlock(&ls->ls_recover_idr_lock);
+       return 0;
+}
+
+static void recover_idr_del(struct dlm_rsb *r)
+{
+       struct dlm_ls *ls = r->res_ls;
+
+       spin_lock(&ls->ls_recover_idr_lock);
+       idr_remove(&ls->ls_recover_idr, r->res_id);
+       r->res_id = 0;
+       ls->ls_recover_list_count--;
+       spin_unlock(&ls->ls_recover_idr_lock);
+
+       dlm_put_rsb(r);
+}
+
+static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
+{
+       struct dlm_rsb *r;
+
+       spin_lock(&ls->ls_recover_idr_lock);
+       r = idr_find(&ls->ls_recover_idr, (int)id);
+       spin_unlock(&ls->ls_recover_idr_lock);
+       return r;
+}
+
+static int recover_idr_clear_rsb(int id, void *p, void *data)
+{
+       struct dlm_ls *ls = data;
+       struct dlm_rsb *r = p;
+
+       r->res_id = 0;
+       r->res_recover_locks_count = 0;
+       ls->ls_recover_list_count--;
+
+       dlm_put_rsb(r);
+       return 0;
+}
+
+static void recover_idr_clear(struct dlm_ls *ls)
+{
+       spin_lock(&ls->ls_recover_idr_lock);
+       idr_for_each(&ls->ls_recover_idr, recover_idr_clear_rsb, ls);
+       idr_remove_all(&ls->ls_recover_idr);
+
+       if (ls->ls_recover_list_count != 0) {
+               log_error(ls, "warning: recover_list_count %d",
+                         ls->ls_recover_list_count);
+               ls->ls_recover_list_count = 0;
+       }
+       spin_unlock(&ls->ls_recover_idr_lock);
+}
+
 
 /* Master recovery: find new master node for rsb's that were
    mastered on nodes that have been removed.
@@ -361,9 +426,8 @@ static void set_master_lkbs(struct dlm_rsb *r)
  * rsb's to consider.
  */
 
-static void set_new_master(struct dlm_rsb *r, int nodeid)
+static void set_new_master(struct dlm_rsb *r)
 {
-       r->res_nodeid = nodeid;
        set_master_lkbs(r);
        rsb_set_flag(r, RSB_NEW_MASTER);
        rsb_set_flag(r, RSB_NEW_MASTER2);
@@ -372,31 +436,48 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
 /*
  * We do async lookups on rsb's that need new masters.  The rsb's
  * waiting for a lookup reply are kept on the recover_list.
+ *
+ * Another node recovering the master may have sent us a rcom lookup,
+ * and our dlm_master_lookup() set it as the new master, along with
+ * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
+ * equals our_nodeid below).
  */
 
-static int recover_master(struct dlm_rsb *r)
+static int recover_master(struct dlm_rsb *r, unsigned int *count)
 {
        struct dlm_ls *ls = r->res_ls;
-       int error, ret_nodeid;
-       int our_nodeid = dlm_our_nodeid();
-       int dir_nodeid = dlm_dir_nodeid(r);
+       int our_nodeid, dir_nodeid;
+       int is_removed = 0;
+       int error;
+
+       if (is_master(r))
+               return 0;
+
+       is_removed = dlm_is_removed(ls, r->res_nodeid);
+
+       if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
+               return 0;
+
+       our_nodeid = dlm_our_nodeid();
+       dir_nodeid = dlm_dir_nodeid(r);
 
        if (dir_nodeid == our_nodeid) {
-               error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
-                                      r->res_length, &ret_nodeid);
-               if (error)
-                       log_error(ls, "recover dir lookup error %d", error);
+               if (is_removed) {
+                       r->res_master_nodeid = our_nodeid;
+                       r->res_nodeid = 0;
+               }
 
-               if (ret_nodeid == our_nodeid)
-                       ret_nodeid = 0;
-               lock_rsb(r);
-               set_new_master(r, ret_nodeid);
-               unlock_rsb(r);
+               /* set master of lkbs to ourself when is_removed, or to
+                  another new master which we set along with NEW_MASTER
+                  in dlm_master_lookup */
+               set_new_master(r);
+               error = 0;
        } else {
-               recover_list_add(r);
+               recover_idr_add(r);
                error = dlm_send_rcom_lookup(r, dir_nodeid);
        }
 
+       (*count)++;
        return error;
 }
 
@@ -415,7 +496,7 @@ static int recover_master(struct dlm_rsb *r)
  * resent.
  */
 
-static int recover_master_static(struct dlm_rsb *r)
+static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
 {
        int dir_nodeid = dlm_dir_nodeid(r);
        int new_master = dir_nodeid;
@@ -423,11 +504,12 @@ static int recover_master_static(struct dlm_rsb *r)
        if (dir_nodeid == dlm_our_nodeid())
                new_master = 0;
 
-       lock_rsb(r);
        dlm_purge_mstcpy_locks(r);
-       set_new_master(r, new_master);
-       unlock_rsb(r);
-       return 1;
+       r->res_master_nodeid = dir_nodeid;
+       r->res_nodeid = new_master;
+       set_new_master(r);
+       (*count)++;
+       return 0;
 }
 
 /*
@@ -443,7 +525,10 @@ static int recover_master_static(struct dlm_rsb *r)
 int dlm_recover_masters(struct dlm_ls *ls)
 {
        struct dlm_rsb *r;
-       int error = 0, count = 0;
+       unsigned int total = 0;
+       unsigned int count = 0;
+       int nodir = dlm_no_directory(ls);
+       int error;
 
        log_debug(ls, "dlm_recover_masters");
 
@@ -455,50 +540,58 @@ int dlm_recover_masters(struct dlm_ls *ls)
                        goto out;
                }
 
-               if (dlm_no_directory(ls))
-                       count += recover_master_static(r);
-               else if (!is_master(r) &&
-                        (dlm_is_removed(ls, r->res_nodeid) ||
-                         rsb_flag(r, RSB_NEW_MASTER))) {
-                       recover_master(r);
-                       count++;
-               }
+               lock_rsb(r);
+               if (nodir)
+                       error = recover_master_static(r, &count);
+               else
+                       error = recover_master(r, &count);
+               unlock_rsb(r);
+               cond_resched();
+               total++;
 
-               schedule();
+               if (error) {
+                       up_read(&ls->ls_root_sem);
+                       goto out;
+               }
        }
        up_read(&ls->ls_root_sem);
 
-       log_debug(ls, "dlm_recover_masters %d resources", count);
+       log_debug(ls, "dlm_recover_masters %u of %u", count, total);
 
-       error = dlm_wait_function(ls, &recover_list_empty);
+       error = dlm_wait_function(ls, &recover_idr_empty);
  out:
        if (error)
-               recover_list_clear(ls);
+               recover_idr_clear(ls);
        return error;
 }
 
 int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
 {
        struct dlm_rsb *r;
-       int nodeid;
+       int ret_nodeid, new_master;
 
-       r = recover_list_find(ls, rc->rc_id);
+       r = recover_idr_find(ls, rc->rc_id);
        if (!r) {
                log_error(ls, "dlm_recover_master_reply no id %llx",
                          (unsigned long long)rc->rc_id);
                goto out;
        }
 
-       nodeid = rc->rc_result;
-       if (nodeid == dlm_our_nodeid())
-               nodeid = 0;
+       ret_nodeid = rc->rc_result;
+
+       if (ret_nodeid == dlm_our_nodeid())
+               new_master = 0;
+       else
+               new_master = ret_nodeid;
 
        lock_rsb(r);
-       set_new_master(r, nodeid);
+       r->res_master_nodeid = ret_nodeid;
+       r->res_nodeid = new_master;
+       set_new_master(r);
        unlock_rsb(r);
-       recover_list_del(r);
+       recover_idr_del(r);
 
-       if (recover_list_empty(ls))
+       if (recover_idr_empty(ls))
                wake_up(&ls->ls_wait_general);
  out:
        return 0;
@@ -711,6 +804,7 @@ static void recover_lvb(struct dlm_rsb *r)
 
 static void recover_conversion(struct dlm_rsb *r)
 {
+       struct dlm_ls *ls = r->res_ls;
        struct dlm_lkb *lkb;
        int grmode = -1;
 
@@ -725,10 +819,15 @@ static void recover_conversion(struct dlm_rsb *r)
        list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
                if (lkb->lkb_grmode != DLM_LOCK_IV)
                        continue;
-               if (grmode == -1)
+               if (grmode == -1) {
+                       log_debug(ls, "recover_conversion %x set gr to rq %d",
+                                 lkb->lkb_id, lkb->lkb_rqmode);
                        lkb->lkb_grmode = lkb->lkb_rqmode;
-               else
+               } else {
+                       log_debug(ls, "recover_conversion %x set gr %d",
+                                 lkb->lkb_id, grmode);
                        lkb->lkb_grmode = grmode;
+               }
        }
 }
 
@@ -791,20 +890,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
                        dlm_hold_rsb(r);
                }
 
-               /* If we're using a directory, add tossed rsbs to the root
-                  list; they'll have entries created in the new directory,
-                  but no other recovery steps should do anything with them. */
-
-               if (dlm_no_directory(ls)) {
-                       spin_unlock(&ls->ls_rsbtbl[i].lock);
-                       continue;
-               }
-
-               for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) {
-                       r = rb_entry(n, struct dlm_rsb, res_hashnode);
-                       list_add(&r->res_root_list, &ls->ls_root_list);
-                       dlm_hold_rsb(r);
-               }
+               if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
+                       log_error(ls, "dlm_create_root_list toss not empty");
                spin_unlock(&ls->ls_rsbtbl[i].lock);
        }
  out:
@@ -824,28 +911,26 @@ void dlm_release_root_list(struct dlm_ls *ls)
        up_write(&ls->ls_root_sem);
 }
 
-/* If not using a directory, clear the entire toss list, there's no benefit to
-   caching the master value since it's fixed.  If we are using a dir, keep the
-   rsb's we're the master of.  Recovery will add them to the root list and from
-   there they'll be entered in the rebuilt directory. */
-
-void dlm_clear_toss_list(struct dlm_ls *ls)
+void dlm_clear_toss(struct dlm_ls *ls)
 {
        struct rb_node *n, *next;
-       struct dlm_rsb *rsb;
+       struct dlm_rsb *r;
+       unsigned int count = 0;
        int i;
 
        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
                spin_lock(&ls->ls_rsbtbl[i].lock);
                for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
-                       next = rb_next(n);;
-                       rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
-                       if (dlm_no_directory(ls) || !is_master(rsb)) {
-                               rb_erase(n, &ls->ls_rsbtbl[i].toss);
-                               dlm_free_rsb(rsb);
-                       }
+                       next = rb_next(n);
+                       r = rb_entry(n, struct dlm_rsb, res_hashnode);
+                       rb_erase(n, &ls->ls_rsbtbl[i].toss);
+                       dlm_free_rsb(r);
+                       count++;
                }
                spin_unlock(&ls->ls_rsbtbl[i].lock);
        }
+
+       if (count)
+               log_debug(ls, "dlm_clear_toss %u done", count);
 }