ocfs2/dlm: Print more messages during lock migration
[firefly-linux-kernel-4.4.55.git] / fs / ocfs2 / dlm / dlmrecovery.c
index d9fa3d22e17c6d27134327d3fd96199038913ed0..cfb2ae9ab5383deaca8927a4b5131a3ab38838d9 100644 (file)
@@ -1050,7 +1050,7 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
                                if (lock->ml.node == dead_node) {
                                        mlog(0, "AHA! there was "
                                             "a $RECOVERY lock for dead "
-                                            "node %u (%s)!\n", 
+                                            "node %u (%s)!\n",
                                             dead_node, dlm->name);
                                        list_del_init(&lock->list);
                                        dlm_lock_put(lock);
@@ -1164,6 +1164,39 @@ static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres,
        mres->master = master;
 }
 
+static void dlm_prepare_lvb_for_migration(struct dlm_lock *lock,
+                                         struct dlm_migratable_lockres *mres,
+                                         int queue)
+{
+       if (!lock->lksb)
+              return;
+
+       /* Ignore lvb in all locks in the blocked list */
+       if (queue == DLM_BLOCKED_LIST)
+               return;
+
+       /* Only consider lvbs in locks with granted EX or PR lock levels */
+       if (lock->ml.type != LKM_EXMODE && lock->ml.type != LKM_PRMODE)
+               return;
+
+       if (dlm_lvb_is_empty(mres->lvb)) {
+               memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN);
+               return;
+       }
+
+       /* Ensure the lvb copied for migration matches in other valid locks */
+       if (!memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))
+               return;
+
+       mlog(ML_ERROR, "Mismatched lvb in lock cookie=%u:%llu, name=%.*s, "
+            "node=%u\n",
+            dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
+            dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
+            lock->lockres->lockname.len, lock->lockres->lockname.name,
+            lock->ml.node);
+       dlm_print_one_lock_resource(lock->lockres);
+       BUG();
+}
 
 /* returns 1 if this lock fills the network structure,
  * 0 otherwise */
@@ -1181,20 +1214,7 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
        ml->list = queue;
        if (lock->lksb) {
                ml->flags = lock->lksb->flags;
-               /* send our current lvb */
-               if (ml->type == LKM_EXMODE ||
-                   ml->type == LKM_PRMODE) {
-                       /* if it is already set, this had better be a PR
-                        * and it has to match */
-                       if (!dlm_lvb_is_empty(mres->lvb) &&
-                           (ml->type == LKM_EXMODE ||
-                            memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
-                               mlog(ML_ERROR, "mismatched lvbs!\n");
-                               dlm_print_one_lock_resource(lock->lockres);
-                               BUG();
-                       }
-                       memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN);
-               }
+               dlm_prepare_lvb_for_migration(lock, mres, queue);
        }
        ml->node = lock->ml.node;
        mres->num_locks++;
@@ -1730,6 +1750,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
        struct dlm_lock *lock = NULL;
        u8 from = O2NM_MAX_NODES;
        unsigned int added = 0;
+       __be64 c;
 
        mlog(0, "running %d locks for this lockres\n", mres->num_locks);
        for (i=0; i<mres->num_locks; i++) {
@@ -1777,19 +1798,48 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                        /* lock is always created locally first, and
                         * destroyed locally last.  it must be on the list */
                        if (!lock) {
-                               __be64 c = ml->cookie;
-                               mlog(ML_ERROR, "could not find local lock "
-                                              "with cookie %u:%llu!\n",
+                               c = ml->cookie;
+                               mlog(ML_ERROR, "Could not find local lock "
+                                              "with cookie %u:%llu, node %u, "
+                                              "list %u, flags 0x%x, type %d, "
+                                              "conv %d, highest blocked %d\n",
                                     dlm_get_lock_cookie_node(be64_to_cpu(c)),
-                                    dlm_get_lock_cookie_seq(be64_to_cpu(c)));
+                                    dlm_get_lock_cookie_seq(be64_to_cpu(c)),
+                                    ml->node, ml->list, ml->flags, ml->type,
+                                    ml->convert_type, ml->highest_blocked);
+                               __dlm_print_one_lock_resource(res);
+                               BUG();
+                       }
+
+                       if (lock->ml.node != ml->node) {
+                               c = lock->ml.cookie;
+                               mlog(ML_ERROR, "Mismatched node# in lock "
+                                    "cookie %u:%llu, name %.*s, node %u\n",
+                                    dlm_get_lock_cookie_node(be64_to_cpu(c)),
+                                    dlm_get_lock_cookie_seq(be64_to_cpu(c)),
+                                    res->lockname.len, res->lockname.name,
+                                    lock->ml.node);
+                               c = ml->cookie;
+                               mlog(ML_ERROR, "Migrate lock cookie %u:%llu, "
+                                    "node %u, list %u, flags 0x%x, type %d, "
+                                    "conv %d, highest blocked %d\n",
+                                    dlm_get_lock_cookie_node(be64_to_cpu(c)),
+                                    dlm_get_lock_cookie_seq(be64_to_cpu(c)),
+                                    ml->node, ml->list, ml->flags, ml->type,
+                                    ml->convert_type, ml->highest_blocked);
                                __dlm_print_one_lock_resource(res);
                                BUG();
                        }
-                       BUG_ON(lock->ml.node != ml->node);
 
                        if (tmpq != queue) {
-                               mlog(0, "lock was on %u instead of %u for %.*s\n",
-                                    j, ml->list, res->lockname.len, res->lockname.name);
+                               c = ml->cookie;
+                               mlog(0, "Lock cookie %u:%llu was on list %u "
+                                    "instead of list %u for %.*s\n",
+                                    dlm_get_lock_cookie_node(be64_to_cpu(c)),
+                                    dlm_get_lock_cookie_seq(be64_to_cpu(c)),
+                                    j, ml->list, res->lockname.len,
+                                    res->lockname.name);
+                               __dlm_print_one_lock_resource(res);
                                spin_unlock(&res->spinlock);
                                continue;
                        }
@@ -1839,7 +1889,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                                 * the lvb. */
                                memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
                        } else {
-                               /* otherwise, the node is sending its 
+                               /* otherwise, the node is sending its
                                 * most recent valid lvb info */
                                BUG_ON(ml->type != LKM_EXMODE &&
                                       ml->type != LKM_PRMODE);
@@ -1886,7 +1936,7 @@ skip_lvb:
                spin_lock(&res->spinlock);
                list_for_each_entry(lock, queue, list) {
                        if (lock->ml.cookie == ml->cookie) {
-                               __be64 c = lock->ml.cookie;
+                               c = lock->ml.cookie;
                                mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
                                     "exists on this lockres!\n", dlm->name,
                                     res->lockname.len, res->lockname.name,
@@ -2114,7 +2164,7 @@ static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
        assert_spin_locked(&res->spinlock);
 
        if (res->owner == dlm->node_num)
-               /* if this node owned the lockres, and if the dead node 
+               /* if this node owned the lockres, and if the dead node
                 * had an EX when he died, blank out the lvb */
                search_node = dead_node;
        else {
@@ -2152,7 +2202,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
 
        /* this node is the lockres master:
         * 1) remove any stale locks for the dead node
-        * 2) if the dead node had an EX when he died, blank out the lvb 
+        * 2) if the dead node had an EX when he died, blank out the lvb
         */
        assert_spin_locked(&dlm->spinlock);
        assert_spin_locked(&res->spinlock);
@@ -2260,7 +2310,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
                                }
                                spin_unlock(&res->spinlock);
                                continue;
-                       }                       
+                       }
                        spin_lock(&res->spinlock);
                        /* zero the lvb if necessary */
                        dlm_revalidate_lvb(dlm, res, dead_node);
@@ -2411,7 +2461,7 @@ static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st)
  * this function on each node racing to become the recovery
  * master will not stop attempting this until either:
  * a) this node gets the EX (and becomes the recovery master),
- * or b) dlm->reco.new_master gets set to some nodenum 
+ * or b) dlm->reco.new_master gets set to some nodenum
  * != O2NM_INVALID_NODE_NUM (another node will do the reco).
  * so each time a recovery master is needed, the entire cluster
  * will sync at this point.  if the new master dies, that will
@@ -2424,7 +2474,7 @@ static int dlm_pick_recovery_master(struct dlm_ctxt *dlm)
 
        mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n",
             dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num);
-again: 
+again:
        memset(&lksb, 0, sizeof(lksb));
 
        ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
@@ -2437,8 +2487,8 @@ again:
        if (ret == DLM_NORMAL) {
                mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n",
                     dlm->name, dlm->node_num);
-               
-               /* got the EX lock.  check to see if another node 
+
+               /* got the EX lock.  check to see if another node
                 * just became the reco master */
                if (dlm_reco_master_ready(dlm)) {
                        mlog(0, "%s: got reco EX lock, but %u will "
@@ -2451,12 +2501,12 @@ again:
                        /* see if recovery was already finished elsewhere */
                        spin_lock(&dlm->spinlock);
                        if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
-                               status = -EINVAL;       
+                               status = -EINVAL;
                                mlog(0, "%s: got reco EX lock, but "
                                     "node got recovered already\n", dlm->name);
                                if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
                                        mlog(ML_ERROR, "%s: new master is %u "
-                                            "but no dead node!\n", 
+                                            "but no dead node!\n",
                                             dlm->name, dlm->reco.new_master);
                                        BUG();
                                }
@@ -2468,7 +2518,7 @@ again:
                 * set the master and send the messages to begin recovery */
                if (!status) {
                        mlog(0, "%s: dead=%u, this=%u, sending "
-                            "begin_reco now\n", dlm->name, 
+                            "begin_reco now\n", dlm->name,
                             dlm->reco.dead_node, dlm->node_num);
                        status = dlm_send_begin_reco_message(dlm,
                                      dlm->reco.dead_node);
@@ -2501,7 +2551,7 @@ again:
                mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n",
                     dlm->name, dlm->node_num);
                /* another node is master. wait on
-                * reco.new_master != O2NM_INVALID_NODE_NUM 
+                * reco.new_master != O2NM_INVALID_NODE_NUM
                 * for at most one second */
                wait_event_timeout(dlm->dlm_reco_thread_wq,
                                         dlm_reco_master_ready(dlm),
@@ -2589,9 +2639,17 @@ retry:
                             "begin reco msg (%d)\n", dlm->name, nodenum, ret);
                        ret = 0;
                }
+               if (ret == -EAGAIN) {
+                       mlog(0, "%s: trying to start recovery of node "
+                            "%u, but node %u is waiting for last recovery "
+                            "to complete, backoff for a bit\n", dlm->name,
+                            dead_node, nodenum);
+                       msleep(100);
+                       goto retry;
+               }
                if (ret < 0) {
                        struct dlm_lock_resource *res;
-                       /* this is now a serious problem, possibly ENOMEM 
+                       /* this is now a serious problem, possibly ENOMEM
                         * in the network stack.  must retry */
                        mlog_errno(ret);
                        mlog(ML_ERROR, "begin reco of dlm %s to node %u "
@@ -2604,18 +2662,10 @@ retry:
                        } else {
                                mlog(ML_ERROR, "recovery lock not found\n");
                        }
-                       /* sleep for a bit in hopes that we can avoid 
+                       /* sleep for a bit in hopes that we can avoid
                         * another ENOMEM */
                        msleep(100);
                        goto retry;
-               } else if (ret == EAGAIN) {
-                       mlog(0, "%s: trying to start recovery of node "
-                            "%u, but node %u is waiting for last recovery "
-                            "to complete, backoff for a bit\n", dlm->name,
-                            dead_node, nodenum);
-                       /* TODO Look into replacing msleep with cond_resched() */
-                       msleep(100);
-                       goto retry;
                }
        }
 
@@ -2639,7 +2689,7 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
                     dlm->name, br->node_idx, br->dead_node,
                     dlm->reco.dead_node, dlm->reco.new_master);
                spin_unlock(&dlm->spinlock);
-               return EAGAIN;
+               return -EAGAIN;
        }
        spin_unlock(&dlm->spinlock);
 
@@ -2664,7 +2714,7 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
        }
        if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) {
                mlog(ML_NOTICE, "%s: dead_node previously set to %u, "
-                    "node %u changing it to %u\n", dlm->name, 
+                    "node %u changing it to %u\n", dlm->name,
                     dlm->reco.dead_node, br->node_idx, br->dead_node);
        }
        dlm_set_reco_master(dlm, br->node_idx);
@@ -2730,8 +2780,8 @@ stage2:
                if (ret < 0) {
                        mlog_errno(ret);
                        if (dlm_is_host_down(ret)) {
-                               /* this has no effect on this recovery 
-                                * session, so set the status to zero to 
+                               /* this has no effect on this recovery
+                                * session, so set the status to zero to
                                 * finish out the last recovery */
                                mlog(ML_ERROR, "node %u went down after this "
                                     "node finished recovery.\n", nodenum);
@@ -2768,7 +2818,7 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
        mlog(0, "%s: node %u finalizing recovery stage%d of "
             "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage,
             fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master);
+
        spin_lock(&dlm->spinlock);
 
        if (dlm->reco.new_master != fr->node_idx) {