ocfs2: mle ref counting fixes
authorKurt Hackel <kurt.hackel@oracle.com>
Fri, 28 Apr 2006 01:51:26 +0000 (18:51 -0700)
committerMark Fasheh <mark.fasheh@oracle.com>
Mon, 26 Jun 2006 21:42:51 +0000 (14:42 -0700)
Signed-off-by: Kurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
fs/ocfs2/dlm/dlmmaster.c

index d1c85f10c6342cd0d8896f24f4cb6a9a699c09b9..19399446aa8e1424b9601122c722863bfb3050c1 100644 (file)
@@ -74,6 +74,7 @@ struct dlm_master_list_entry
        wait_queue_head_t wq;
        atomic_t woken;
        struct kref mle_refs;
+       int inuse;
        unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
        unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
        unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
@@ -337,6 +338,31 @@ static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
        spin_unlock(&dlm->spinlock);
 }
 
+static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
+{
+       struct dlm_ctxt *dlm;
+       dlm = mle->dlm;
+
+       assert_spin_locked(&dlm->spinlock);
+       assert_spin_locked(&dlm->master_lock);
+       mle->inuse++;
+       kref_get(&mle->mle_refs);
+}
+
+static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
+{
+       struct dlm_ctxt *dlm;
+       dlm = mle->dlm;
+
+       spin_lock(&dlm->spinlock);
+       spin_lock(&dlm->master_lock);
+       mle->inuse--;
+       __dlm_put_mle(mle);
+       spin_unlock(&dlm->master_lock);
+       spin_unlock(&dlm->spinlock);
+
+}
+
 /* remove from list and free */
 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
 {
@@ -390,6 +416,7 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle,
        memset(mle->response_map, 0, sizeof(mle->response_map));
        mle->master = O2NM_MAX_NODES;
        mle->new_master = O2NM_MAX_NODES;
+       mle->inuse = 0;
 
        if (mle->type == DLM_MLE_MASTER) {
                BUG_ON(!res);
@@ -809,7 +836,7 @@ lookup:
         * if so, the creator of the BLOCK may try to put the last
         * ref at this time in the assert master handler, so we
         * need an extra one to keep from a bad ptr deref. */
-       dlm_get_mle(mle);
+       dlm_get_mle_inuse(mle);
        spin_unlock(&dlm->master_lock);
        spin_unlock(&dlm->spinlock);
 
@@ -899,7 +926,7 @@ wait:
        dlm_mle_detach_hb_events(dlm, mle);
        dlm_put_mle(mle);
        /* put the extra ref */
-       dlm_put_mle(mle);
+       dlm_put_mle_inuse(mle);
 
 wake_waiters:
        spin_lock(&res->spinlock);
@@ -1753,6 +1780,7 @@ ok:
        if (mle) {
                int extra_ref = 0;
                int nn = -1;
+               int rr, err = 0;
                
                spin_lock(&mle->spinlock);
                if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
@@ -1772,27 +1800,64 @@ ok:
                wake_up(&mle->wq);
                spin_unlock(&mle->spinlock);
 
-               if (mle->type == DLM_MLE_MIGRATION && res) {
-                       mlog(0, "finishing off migration of lockres %.*s, "
-                            "from %u to %u\n",
-                              res->lockname.len, res->lockname.name,
-                              dlm->node_num, mle->new_master);
+               if (res) {
                        spin_lock(&res->spinlock);
-                       res->state &= ~DLM_LOCK_RES_MIGRATING;
-                       dlm_change_lockres_owner(dlm, res, mle->new_master);
-                       BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
+                       if (mle->type == DLM_MLE_MIGRATION) {
+                               mlog(0, "finishing off migration of lockres %.*s, "
+                                       "from %u to %u\n",
+                                       res->lockname.len, res->lockname.name,
+                                       dlm->node_num, mle->new_master);
+                               res->state &= ~DLM_LOCK_RES_MIGRATING;
+                               dlm_change_lockres_owner(dlm, res, mle->new_master);
+                               BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
+                       } else {
+                               dlm_change_lockres_owner(dlm, res, mle->master);
+                       }
                        spin_unlock(&res->spinlock);
                }
-               /* master is known, detach if not already detached */
-               dlm_mle_detach_hb_events(dlm, mle);
-               dlm_put_mle(mle);
-               
+
+               /* master is known, detach if not already detached.
+                * ensures that only one assert_master call will happen
+                * on this mle. */
+               spin_lock(&dlm->spinlock);
+               spin_lock(&dlm->master_lock);
+
+               rr = atomic_read(&mle->mle_refs.refcount);
+               if (mle->inuse > 0) {
+                       if (extra_ref && rr < 3)
+                               err = 1;
+                       else if (!extra_ref && rr < 2)
+                               err = 1;
+               } else {
+                       if (extra_ref && rr < 2)
+                               err = 1;
+                       else if (!extra_ref && rr < 1)
+                               err = 1;
+               }
+               if (err) {
+                       mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
+                            "that will mess up this node, refs=%d, extra=%d, "
+                            "inuse=%d\n", dlm->name, namelen, name,
+                            assert->node_idx, rr, extra_ref, mle->inuse);
+                       dlm_print_one_mle(mle);
+               }
+               list_del_init(&mle->list);
+               __dlm_mle_detach_hb_events(dlm, mle);
+               __dlm_put_mle(mle);
                if (extra_ref) {
                        /* the assert master message now balances the extra
                         * ref given by the master / migration request message.
                         * if this is the last put, it will be removed
                         * from the list. */
-                       dlm_put_mle(mle);
+                       __dlm_put_mle(mle);
+               }
+               spin_unlock(&dlm->master_lock);
+               spin_unlock(&dlm->spinlock);
+       } else if (res) {
+               if (res->owner != assert->node_idx) {
+                       mlog(0, "assert_master from %u, but current "
+                            "owner is %u (%.*s), no mle\n", assert->node_idx,
+                            res->owner, namelen, name);
                }
        }
 
@@ -2138,7 +2203,7 @@ fail:
         * take both dlm->spinlock and dlm->master_lock */
        spin_lock(&dlm->spinlock);
        spin_lock(&dlm->master_lock);
-       dlm_get_mle(mle);
+       dlm_get_mle_inuse(mle);
        spin_unlock(&dlm->master_lock);
        spin_unlock(&dlm->spinlock);
 
@@ -2155,7 +2220,10 @@ fail:
                /* migration failed, detach and clean up mle */
                dlm_mle_detach_hb_events(dlm, mle);
                dlm_put_mle(mle);
-               dlm_put_mle(mle);
+               dlm_put_mle_inuse(mle);
+               spin_lock(&res->spinlock);
+               res->state &= ~DLM_LOCK_RES_MIGRATING;
+               spin_unlock(&res->spinlock);
                goto leave;
        }
 
@@ -2196,7 +2264,10 @@ fail:
                        /* migration failed, detach and clean up mle */
                        dlm_mle_detach_hb_events(dlm, mle);
                        dlm_put_mle(mle);
-                       dlm_put_mle(mle);
+                       dlm_put_mle_inuse(mle);
+                       spin_lock(&res->spinlock);
+                       res->state &= ~DLM_LOCK_RES_MIGRATING;
+                       spin_unlock(&res->spinlock);
                        goto leave;
                }
                /* TODO: if node died: stop, clean up, return error */
@@ -2212,7 +2283,7 @@ fail:
 
        /* master is known, detach if not already detached */
        dlm_mle_detach_hb_events(dlm, mle);
-       dlm_put_mle(mle);
+       dlm_put_mle_inuse(mle);
        ret = 0;
 
        dlm_lockres_calc_usage(dlm, res);