dlm: fix conversion deadlock from recovery
[firefly-linux-kernel-4.4.55.git] / fs / dlm / lock.c
index c7c6cf9e868535b008842c2997448b99d0e190ee..04e3f15aa0ccc88ea374384746cd965c31e53b48 100644 (file)
@@ -2279,10 +2279,14 @@ static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
  * immediate request, it is 0 if called later, after the lock has been
  * queued.
  *
+ * recover is 1 if dlm_recover_grant() is trying to grant conversions
+ * after recovery.
+ *
  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
  */
 
-static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
+static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
+                          int recover)
 {
        int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
 
@@ -2314,7 +2318,7 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
         */
 
        if (queue_conflict(&r->res_grantqueue, lkb))
-               goto out;
+               return 0;
 
        /*
         * 6-3: By default, a conversion request is immediately granted if the
@@ -2323,7 +2327,24 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
         */
 
        if (queue_conflict(&r->res_convertqueue, lkb))
-               goto out;
+               return 0;
+
+       /*
+        * The RECOVER_GRANT flag means dlm_recover_grant() is granting
+        * locks for a recovered rsb, on which lkb's have been rebuilt.
+        * The lkb's may have been rebuilt on the queues in a different
+        * order than they were in on the previous master.  So, granting
+        * queued conversions in order after recovery doesn't make sense
+        * since the order hasn't been preserved anyway.  The new order
+        * could also have created a new "in place" conversion deadlock.
+        * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
+        * After recovery, there would be no granted locks, and possibly
+        * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
+        * recovery, grant conversions without considering order.
+        */
+
+       if (conv && recover)
+               return 1;
 
        /*
         * 6-5: But the default algorithm for deciding whether to grant or
@@ -2360,7 +2381,7 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
                if (list_empty(&r->res_convertqueue))
                        return 1;
                else
-                       goto out;
+                       return 0;
        }
 
        /*
@@ -2406,12 +2427,12 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
        if (!now && !conv && list_empty(&r->res_convertqueue) &&
            first_in_list(lkb, &r->res_waitqueue))
                return 1;
- out:
+
        return 0;
 }
 
 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
-                         int *err)
+                         int recover, int *err)
 {
        int rv;
        int8_t alt = 0, rqmode = lkb->lkb_rqmode;
@@ -2420,7 +2441,7 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
        if (err)
                *err = 0;
 
-       rv = _can_be_granted(r, lkb, now);
+       rv = _can_be_granted(r, lkb, now, recover);
        if (rv)
                goto out;
 
@@ -2461,7 +2482,7 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
 
        if (alt) {
                lkb->lkb_rqmode = alt;
-               rv = _can_be_granted(r, lkb, now);
+               rv = _can_be_granted(r, lkb, now, 0);
                if (rv)
                        lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
                else
@@ -2485,6 +2506,7 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
                                 unsigned int *count)
 {
        struct dlm_lkb *lkb, *s;
+       int recover = rsb_flag(r, RSB_RECOVER_GRANT);
        int hi, demoted, quit, grant_restart, demote_restart;
        int deadlk;
 
@@ -2498,7 +2520,7 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
                demoted = is_demoted(lkb);
                deadlk = 0;
 
-               if (can_be_granted(r, lkb, 0, &deadlk)) {
+               if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
                        grant_lock_pending(r, lkb);
                        grant_restart = 1;
                        if (count)
@@ -2542,7 +2564,7 @@ static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
        struct dlm_lkb *lkb, *s;
 
        list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
-               if (can_be_granted(r, lkb, 0, NULL)) {
+               if (can_be_granted(r, lkb, 0, 0, NULL)) {
                        grant_lock_pending(r, lkb);
                        if (count)
                                (*count)++;
@@ -3042,7 +3064,7 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
        int error = 0;
 
-       if (can_be_granted(r, lkb, 1, NULL)) {
+       if (can_be_granted(r, lkb, 1, 0, NULL)) {
                grant_lock(r, lkb);
                queue_cast(r, lkb, 0);
                goto out;
@@ -3082,7 +3104,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
 
        /* changing an existing lock may allow others to be granted */
 
-       if (can_be_granted(r, lkb, 1, &deadlk)) {
+       if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
                grant_lock(r, lkb);
                queue_cast(r, lkb, 0);
                goto out;
@@ -3108,7 +3130,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
 
        if (is_demoted(lkb)) {
                grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
-               if (_can_be_granted(r, lkb, 1)) {
+               if (_can_be_granted(r, lkb, 1, 0)) {
                        grant_lock(r, lkb);
                        queue_cast(r, lkb, 0);
                        goto out;
@@ -5373,9 +5395,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 
                if (!rsb_flag(r, RSB_RECOVER_GRANT))
                        continue;
-               rsb_clear_flag(r, RSB_RECOVER_GRANT);
-               if (!is_master(r))
+               if (!is_master(r)) {
+                       rsb_clear_flag(r, RSB_RECOVER_GRANT);
                        continue;
+               }
                hold_rsb(r);
                spin_unlock(&ls->ls_rsbtbl[bucket].lock);
                return r;
@@ -5420,7 +5443,9 @@ void dlm_recover_grant(struct dlm_ls *ls)
                rsb_count++;
                count = 0;
                lock_rsb(r);
+               /* the RECOVER_GRANT flag is checked in the grant path */
                grant_pending_locks(r, &count);
+               rsb_clear_flag(r, RSB_RECOVER_GRANT);
                lkb_count += count;
                confirm_master(r, 0);
                unlock_rsb(r);