nfsd: optimize destroy_lockowner cl_lock thrashing
[firefly-linux-kernel-4.4.55.git] / fs / nfsd / nfs4state.c
index b0c0f4cdf5031c5cd3dd7401a31df2204269bb83..9358cbe2283d87af4a2dc076308c276265f96e55 100644 (file)
@@ -85,6 +85,12 @@ static DEFINE_MUTEX(client_mutex);
  */
 static DEFINE_SPINLOCK(state_lock);
 
+/*
+ * A waitqueue for all in-progress 4.0 CLOSE operations that are waiting for
+ * the refcount on the open stateid to drop.
+ */
+static DECLARE_WAIT_QUEUE_HEAD(close_wq);
+
 static struct kmem_cache *openowner_slab;
 static struct kmem_cache *lockowner_slab;
 static struct kmem_cache *file_slab;
@@ -233,6 +239,44 @@ static void nfsd4_put_session(struct nfsd4_session *ses)
        spin_unlock(&nn->client_lock);
 }
 
+static int
+same_owner_str(struct nfs4_stateowner *sop, struct xdr_netobj *owner)
+{
+       return (sop->so_owner.len == owner->len) &&
+               0 == memcmp(sop->so_owner.data, owner->data, owner->len);
+}
+
+static struct nfs4_openowner *
+find_openstateowner_str_locked(unsigned int hashval, struct nfsd4_open *open,
+                       struct nfs4_client *clp)
+{
+       struct nfs4_stateowner *so;
+
+       lockdep_assert_held(&clp->cl_lock);
+
+       list_for_each_entry(so, &clp->cl_ownerstr_hashtbl[hashval],
+                           so_strhash) {
+               if (!so->so_is_open_owner)
+                       continue;
+               if (same_owner_str(so, &open->op_owner)) {
+                       atomic_inc(&so->so_count);
+                       return openowner(so);
+               }
+       }
+       return NULL;
+}
+
+static struct nfs4_openowner *
+find_openstateowner_str(unsigned int hashval, struct nfsd4_open *open,
+                       struct nfs4_client *clp)
+{
+       struct nfs4_openowner *oo;
+
+       spin_lock(&clp->cl_lock);
+       oo = find_openstateowner_str_locked(hashval, open, clp);
+       spin_unlock(&clp->cl_lock);
+       return oo;
+}
 
 static inline u32
 opaque_hashval(const void *ptr, int nbytes)
@@ -355,12 +399,11 @@ unsigned long max_delegations;
 #define OWNER_HASH_SIZE             (1 << OWNER_HASH_BITS)
 #define OWNER_HASH_MASK             (OWNER_HASH_SIZE - 1)
 
-static unsigned int ownerstr_hashval(u32 clientid, struct xdr_netobj *ownername)
+static unsigned int ownerstr_hashval(struct xdr_netobj *ownername)
 {
        unsigned int ret;
 
        ret = opaque_hashval(ownername->data, ownername->len);
-       ret += clientid;
        return ret & OWNER_HASH_MASK;
 }
 
@@ -640,8 +683,10 @@ nfs4_put_stid(struct nfs4_stid *s)
 
        might_lock(&clp->cl_lock);
 
-       if (!atomic_dec_and_lock(&s->sc_count, &clp->cl_lock))
+       if (!atomic_dec_and_lock(&s->sc_count, &clp->cl_lock)) {
+               wake_up_all(&close_wq);
                return;
+       }
        idr_remove(&clp->cl_stateids, s->sc_stateid.si_opaque.so_id);
        spin_unlock(&clp->cl_lock);
        s->sc_free(s);
@@ -890,6 +935,20 @@ release_all_access(struct nfs4_ol_stateid *stp)
        }
 }
 
+static void nfs4_put_stateowner(struct nfs4_stateowner *sop)
+{
+       struct nfs4_client *clp = sop->so_client;
+
+       might_lock(&clp->cl_lock);
+
+       if (!atomic_dec_and_lock(&sop->so_count, &clp->cl_lock))
+               return;
+       sop->so_ops->so_unhash(sop);
+       spin_unlock(&clp->cl_lock);
+       kfree(sop->so_owner.data);
+       sop->so_ops->so_free(sop);
+}
+
 static void unhash_generic_stateid(struct nfs4_ol_stateid *stp)
 {
        struct nfs4_file *fp = stp->st_stid.sc_file;
@@ -907,6 +966,8 @@ static void nfs4_free_ol_stateid(struct nfs4_stid *stid)
        struct nfs4_ol_stateid *stp = openlockstateid(stid);
 
        release_all_access(stp);
+       if (stp->st_stateowner)
+               nfs4_put_stateowner(stp->st_stateowner);
        kmem_cache_free(stateid_slab, stid);
 }
 
@@ -922,55 +983,69 @@ static void nfs4_free_lock_stateid(struct nfs4_stid *stid)
        nfs4_free_ol_stateid(stid);
 }
 
-static void __release_lock_stateid(struct nfs4_ol_stateid *stp)
+static void unhash_lock_stateid(struct nfs4_ol_stateid *stp)
 {
        struct nfs4_openowner *oo = openowner(stp->st_openstp->st_stateowner);
 
-       spin_lock(&oo->oo_owner.so_client->cl_lock);
-       list_del(&stp->st_locks);
+       lockdep_assert_held(&oo->oo_owner.so_client->cl_lock);
+
+       list_del_init(&stp->st_locks);
        unhash_generic_stateid(stp);
        unhash_stid(&stp->st_stid);
-       spin_unlock(&oo->oo_owner.so_client->cl_lock);
-       nfs4_put_stid(&stp->st_stid);
 }
 
-static void unhash_lockowner(struct nfs4_lockowner *lo)
+static void release_lock_stateid(struct nfs4_ol_stateid *stp)
 {
-       struct nfs4_ol_stateid *stp;
+       struct nfs4_openowner *oo = openowner(stp->st_openstp->st_stateowner);
 
-       list_del(&lo->lo_owner.so_strhash);
-       while (!list_empty(&lo->lo_owner.so_stateids)) {
-               stp = list_first_entry(&lo->lo_owner.so_stateids,
-                               struct nfs4_ol_stateid, st_perstateowner);
-               __release_lock_stateid(stp);
-       }
+       spin_lock(&oo->oo_owner.so_client->cl_lock);
+       unhash_lock_stateid(stp);
+       spin_unlock(&oo->oo_owner.so_client->cl_lock);
+       nfs4_put_stid(&stp->st_stid);
 }
 
-static void nfs4_free_lockowner(struct nfs4_lockowner *lo)
+static void unhash_lockowner_locked(struct nfs4_lockowner *lo)
 {
-       kfree(lo->lo_owner.so_owner.data);
-       kmem_cache_free(lockowner_slab, lo);
-}
+       struct nfs4_client *clp = lo->lo_owner.so_client;
 
-static void release_lockowner(struct nfs4_lockowner *lo)
-{
-       unhash_lockowner(lo);
-       nfs4_free_lockowner(lo);
-}
+       lockdep_assert_held(&clp->cl_lock);
 
-static void release_lockowner_if_empty(struct nfs4_lockowner *lo)
-{
-       if (list_empty(&lo->lo_owner.so_stateids))
-               release_lockowner(lo);
+       list_del_init(&lo->lo_owner.so_strhash);
 }
 
-static void release_lock_stateid(struct nfs4_ol_stateid *stp)
+static void release_lockowner(struct nfs4_lockowner *lo)
 {
-       struct nfs4_lockowner *lo;
+       struct nfs4_client *clp = lo->lo_owner.so_client;
+       struct nfs4_ol_stateid *stp;
+       struct list_head reaplist;
 
-       lo = lockowner(stp->st_stateowner);
-       __release_lock_stateid(stp);
-       release_lockowner_if_empty(lo);
+       INIT_LIST_HEAD(&reaplist);
+
+       spin_lock(&clp->cl_lock);
+       unhash_lockowner_locked(lo);
+       while (!list_empty(&lo->lo_owner.so_stateids)) {
+               stp = list_first_entry(&lo->lo_owner.so_stateids,
+                               struct nfs4_ol_stateid, st_perstateowner);
+               unhash_lock_stateid(stp);
+               /*
+                * We now know that no new references can be added to the
+                * stateid. If ours is the last one, finish the unhashing
+                * and put it on the list to be reaped.
+                */
+               if (atomic_dec_and_test(&stp->st_stid.sc_count)) {
+                       idr_remove(&clp->cl_stateids,
+                               stp->st_stid.sc_stateid.si_opaque.so_id);
+                       list_add(&stp->st_locks, &reaplist);
+               }
+       }
+       spin_unlock(&clp->cl_lock);
+       while (!list_empty(&reaplist)) {
+               stp = list_first_entry(&reaplist, struct nfs4_ol_stateid,
+                                       st_locks);
+               list_del(&stp->st_locks);
+               stp->st_stid.sc_free(&stp->st_stid);
+       }
+       nfs4_put_stateowner(&lo->lo_owner);
 }
 
 static void release_open_stateid_locks(struct nfs4_ol_stateid *open_stp)
@@ -1002,17 +1077,14 @@ static void release_open_stateid(struct nfs4_ol_stateid *stp)
        nfs4_put_stid(&stp->st_stid);
 }
 
-static void unhash_openowner(struct nfs4_openowner *oo)
+static void unhash_openowner_locked(struct nfs4_openowner *oo)
 {
-       struct nfs4_ol_stateid *stp;
+       struct nfs4_client *clp = oo->oo_owner.so_client;
 
-       list_del(&oo->oo_owner.so_strhash);
-       list_del(&oo->oo_perclient);
-       while (!list_empty(&oo->oo_owner.so_stateids)) {
-               stp = list_first_entry(&oo->oo_owner.so_stateids,
-                               struct nfs4_ol_stateid, st_perstateowner);
-               release_open_stateid(stp);
-       }
+       lockdep_assert_held(&clp->cl_lock);
+
+       list_del_init(&oo->oo_owner.so_strhash);
+       list_del_init(&oo->oo_perclient);
 }
 
 static void release_last_closed_stateid(struct nfs4_openowner *oo)
@@ -1020,23 +1092,38 @@ static void release_last_closed_stateid(struct nfs4_openowner *oo)
        struct nfs4_ol_stateid *s = oo->oo_last_closed_stid;
 
        if (s) {
-               nfs4_put_stid(&s->st_stid);
+               list_del_init(&oo->oo_close_lru);
                oo->oo_last_closed_stid = NULL;
+               nfs4_put_stid(&s->st_stid);
        }
 }
 
-static void nfs4_free_openowner(struct nfs4_openowner *oo)
+static void release_openowner_stateids(struct nfs4_openowner *oo)
 {
-       kfree(oo->oo_owner.so_owner.data);
-       kmem_cache_free(openowner_slab, oo);
+       struct nfs4_ol_stateid *stp;
+       struct nfs4_client *clp = oo->oo_owner.so_client;
+
+       lockdep_assert_held(&clp->cl_lock);
+
+       while (!list_empty(&oo->oo_owner.so_stateids)) {
+               stp = list_first_entry(&oo->oo_owner.so_stateids,
+                               struct nfs4_ol_stateid, st_perstateowner);
+               spin_unlock(&clp->cl_lock);
+               release_open_stateid(stp);
+               spin_lock(&clp->cl_lock);
+       }
 }
 
 static void release_openowner(struct nfs4_openowner *oo)
 {
-       unhash_openowner(oo);
-       list_del(&oo->oo_close_lru);
+       struct nfs4_client *clp = oo->oo_owner.so_client;
+
+       spin_lock(&clp->cl_lock);
+       unhash_openowner_locked(oo);
+       release_openowner_stateids(oo);
+       spin_unlock(&clp->cl_lock);
        release_last_closed_stateid(oo);
-       nfs4_free_openowner(oo);
+       nfs4_put_stateowner(&oo->oo_owner);
 }
 
 static inline int
@@ -1073,7 +1160,7 @@ void nfsd4_bump_seqid(struct nfsd4_compound_state *cstate, __be32 nfserr)
                return;
 
        if (!seqid_mutating_err(ntohl(nfserr))) {
-               cstate->replay_owner = NULL;
+               nfsd4_cstate_clear_replay(cstate);
                return;
        }
        if (!so)
@@ -1416,15 +1503,20 @@ STALE_CLIENTID(clientid_t *clid, struct nfsd_net *nn)
 static struct nfs4_client *alloc_client(struct xdr_netobj name)
 {
        struct nfs4_client *clp;
+       int i;
 
        clp = kzalloc(sizeof(struct nfs4_client), GFP_KERNEL);
        if (clp == NULL)
                return NULL;
        clp->cl_name.data = kmemdup(name.data, name.len, GFP_KERNEL);
-       if (clp->cl_name.data == NULL) {
-               kfree(clp);
-               return NULL;
-       }
+       if (clp->cl_name.data == NULL)
+               goto err_no_name;
+       clp->cl_ownerstr_hashtbl = kmalloc(sizeof(struct list_head) *
+                       OWNER_HASH_SIZE, GFP_KERNEL);
+       if (!clp->cl_ownerstr_hashtbl)
+               goto err_no_hashtbl;
+       for (i = 0; i < OWNER_HASH_SIZE; i++)
+               INIT_LIST_HEAD(&clp->cl_ownerstr_hashtbl[i]);
        clp->cl_name.len = name.len;
        INIT_LIST_HEAD(&clp->cl_sessions);
        idr_init(&clp->cl_stateids);
@@ -1439,6 +1531,11 @@ static struct nfs4_client *alloc_client(struct xdr_netobj name)
        spin_lock_init(&clp->cl_lock);
        rpc_init_wait_queue(&clp->cl_cb_waitq, "Backchannel slot table");
        return clp;
+err_no_hashtbl:
+       kfree(clp->cl_name.data);
+err_no_name:
+       kfree(clp);
+       return NULL;
 }
 
 static void
@@ -1457,6 +1554,7 @@ free_client(struct nfs4_client *clp)
        }
        rpc_destroy_wait_queue(&clp->cl_cb_waitq);
        free_svc_cred(&clp->cl_cred);
+       kfree(clp->cl_ownerstr_hashtbl);
        kfree(clp->cl_name.data);
        idr_destroy(&clp->cl_stateids);
        kfree(clp);
@@ -1503,6 +1601,7 @@ destroy_client(struct nfs4_client *clp)
        }
        while (!list_empty(&clp->cl_openowners)) {
                oo = list_entry(clp->cl_openowners.next, struct nfs4_openowner, oo_perclient);
+               atomic_inc(&oo->oo_owner.so_count);
                release_openowner(oo);
        }
        nfsd4_shutdown_callback(clp);
@@ -1696,8 +1795,12 @@ find_stateid_by_type(struct nfs4_client *cl, stateid_t *t, char typemask)
 
        spin_lock(&cl->cl_lock);
        s = find_stateid_locked(cl, t);
-       if (s != NULL && !(typemask & s->sc_type))
-               s = NULL;
+       if (s != NULL) {
+               if (typemask & s->sc_type)
+                       atomic_inc(&s->sc_count);
+               else
+                       s = NULL;
+       }
        spin_unlock(&cl->cl_lock);
        return s;
 }
@@ -2940,6 +3043,28 @@ static void init_nfs4_replay(struct nfs4_replay *rp)
        rp->rp_status = nfserr_serverfault;
        rp->rp_buflen = 0;
        rp->rp_buf = rp->rp_ibuf;
+       mutex_init(&rp->rp_mutex);
+}
+
+static void nfsd4_cstate_assign_replay(struct nfsd4_compound_state *cstate,
+               struct nfs4_stateowner *so)
+{
+       if (!nfsd4_has_session(cstate)) {
+               mutex_lock(&so->so_replay.rp_mutex);
+               cstate->replay_owner = so;
+               atomic_inc(&so->so_count);
+       }
+}
+
+void nfsd4_cstate_clear_replay(struct nfsd4_compound_state *cstate)
+{
+       struct nfs4_stateowner *so = cstate->replay_owner;
+
+       if (so != NULL) {
+               cstate->replay_owner = NULL;
+               mutex_unlock(&so->so_replay.rp_mutex);
+               nfs4_put_stateowner(so);
+       }
 }
 
 static inline void *alloc_stateowner(struct kmem_cache *slab, struct xdr_netobj *owner, struct nfs4_client *clp)
@@ -2960,36 +3085,63 @@ static inline void *alloc_stateowner(struct kmem_cache *slab, struct xdr_netobj
        INIT_LIST_HEAD(&sop->so_stateids);
        sop->so_client = clp;
        init_nfs4_replay(&sop->so_replay);
+       atomic_set(&sop->so_count, 1);
        return sop;
 }
 
 static void hash_openowner(struct nfs4_openowner *oo, struct nfs4_client *clp, unsigned int strhashval)
 {
-       struct nfsd_net *nn = net_generic(clp->net, nfsd_net_id);
+       lockdep_assert_held(&clp->cl_lock);
 
-       list_add(&oo->oo_owner.so_strhash, &nn->ownerstr_hashtbl[strhashval]);
+       list_add(&oo->oo_owner.so_strhash,
+                &clp->cl_ownerstr_hashtbl[strhashval]);
        list_add(&oo->oo_perclient, &clp->cl_openowners);
 }
 
+static void nfs4_unhash_openowner(struct nfs4_stateowner *so)
+{
+       unhash_openowner_locked(openowner(so));
+}
+
+static void nfs4_free_openowner(struct nfs4_stateowner *so)
+{
+       struct nfs4_openowner *oo = openowner(so);
+
+       kmem_cache_free(openowner_slab, oo);
+}
+
+static const struct nfs4_stateowner_operations openowner_ops = {
+       .so_unhash =    nfs4_unhash_openowner,
+       .so_free =      nfs4_free_openowner,
+};
+
 static struct nfs4_openowner *
 alloc_init_open_stateowner(unsigned int strhashval, struct nfsd4_open *open,
                           struct nfsd4_compound_state *cstate)
 {
        struct nfs4_client *clp = cstate->clp;
-       struct nfs4_openowner *oo;
+       struct nfs4_openowner *oo, *ret;
 
        oo = alloc_stateowner(openowner_slab, &open->op_owner, clp);
        if (!oo)
                return NULL;
+       oo->oo_owner.so_ops = &openowner_ops;
        oo->oo_owner.so_is_open_owner = 1;
        oo->oo_owner.so_seqid = open->op_seqid;
-       oo->oo_flags = NFS4_OO_NEW;
+       oo->oo_flags = 0;
        if (nfsd4_has_session(cstate))
                oo->oo_flags |= NFS4_OO_CONFIRMED;
        oo->oo_time = 0;
        oo->oo_last_closed_stid = NULL;
        INIT_LIST_HEAD(&oo->oo_close_lru);
-       hash_openowner(oo, clp, strhashval);
+       spin_lock(&clp->cl_lock);
+       ret = find_openstateowner_str_locked(strhashval, open, clp);
+       if (ret == NULL) {
+               hash_openowner(oo, clp, strhashval);
+               ret = oo;
+       } else
+               nfs4_free_openowner(&oo->oo_owner);
+       spin_unlock(&clp->cl_lock);
        return oo;
 }
 
@@ -3000,6 +3152,7 @@ static void init_open_stateid(struct nfs4_ol_stateid *stp, struct nfs4_file *fp,
        stp->st_stid.sc_type = NFS4_OPEN_STID;
        INIT_LIST_HEAD(&stp->st_locks);
        stp->st_stateowner = &oo->oo_owner;
+       atomic_inc(&stp->st_stateowner->so_count);
        get_nfs4_file(fp);
        stp->st_stid.sc_file = fp;
        stp->st_access_bmap = 0;
@@ -3013,47 +3166,40 @@ static void init_open_stateid(struct nfs4_ol_stateid *stp, struct nfs4_file *fp,
        spin_unlock(&oo->oo_owner.so_client->cl_lock);
 }
 
+/*
+ * In the 4.0 case we need to keep the owners around a little while to handle
+ * CLOSE replay. We still do need to release any file access that is held by
+ * them before returning however.
+ */
 static void
-move_to_close_lru(struct nfs4_openowner *oo, struct net *net)
+move_to_close_lru(struct nfs4_ol_stateid *s, struct net *net)
 {
-       struct nfsd_net *nn = net_generic(net, nfsd_net_id);
+       struct nfs4_openowner *oo = openowner(s->st_stateowner);
+       struct nfsd_net *nn = net_generic(s->st_stid.sc_client->net,
+                                               nfsd_net_id);
 
        dprintk("NFSD: move_to_close_lru nfs4_openowner %p\n", oo);
 
-       list_move_tail(&oo->oo_close_lru, &nn->close_lru);
-       oo->oo_time = get_seconds();
-}
-
-static int
-same_owner_str(struct nfs4_stateowner *sop, struct xdr_netobj *owner,
-                                                       clientid_t *clid)
-{
-       return (sop->so_owner.len == owner->len) &&
-               0 == memcmp(sop->so_owner.data, owner->data, owner->len) &&
-               (sop->so_client->cl_clientid.cl_id == clid->cl_id);
-}
-
-static struct nfs4_openowner *
-find_openstateowner_str(unsigned int hashval, struct nfsd4_open *open,
-                       bool sessions, struct nfsd_net *nn)
-{
-       struct nfs4_stateowner *so;
-       struct nfs4_openowner *oo;
-       struct nfs4_client *clp;
+       /*
+        * We know that we hold one reference via nfsd4_close, and another
+        * "persistent" reference for the client. If the refcount is higher
+        * than 2, then there are still calls in progress that are using this
+        * stateid. We can't put the sc_file reference until they are finished.
+        * Wait for the refcount to drop to 2. Since it has been unhashed,
+        * there should be no danger of the refcount going back up again at
+        * this point.
+        */
+       wait_event(close_wq, atomic_read(&s->st_stid.sc_count) == 2);
 
-       list_for_each_entry(so, &nn->ownerstr_hashtbl[hashval], so_strhash) {
-               if (!so->so_is_open_owner)
-                       continue;
-               if (same_owner_str(so, &open->op_owner, &open->op_clientid)) {
-                       oo = openowner(so);
-                       clp = oo->oo_owner.so_client;
-                       if ((bool)clp->cl_minorversion != sessions)
-                               return NULL;
-                       renew_client(oo->oo_owner.so_client);
-                       return oo;
-               }
+       release_all_access(s);
+       if (s->st_stid.sc_file) {
+               put_nfs4_file(s->st_stid.sc_file);
+               s->st_stid.sc_file = NULL;
        }
-       return NULL;
+       release_last_closed_stateid(oo);
+       oo->oo_last_closed_stid = s;
+       list_move_tail(&oo->oo_close_lru, &nn->close_lru);
+       oo->oo_time = get_seconds();
 }
 
 /* search file_hashtbl[] for file */
@@ -3277,8 +3423,8 @@ nfsd4_process_open1(struct nfsd4_compound_state *cstate,
                return status;
        clp = cstate->clp;
 
-       strhashval = ownerstr_hashval(clientid->cl_id, &open->op_owner);
-       oo = find_openstateowner_str(strhashval, open, cstate->minorversion, nn);
+       strhashval = ownerstr_hashval(&open->op_owner);
+       oo = find_openstateowner_str(strhashval, open, clp);
        open->op_openowner = oo;
        if (!oo) {
                goto new_owner;
@@ -3326,8 +3472,6 @@ static struct nfs4_delegation *find_deleg_stateid(struct nfs4_client *cl, statei
        ret = find_stateid_by_type(cl, s, NFS4_DELEG_STID);
        if (!ret)
                return NULL;
-       /* FIXME: move into find_stateid_by_type */
-       atomic_inc(&ret->sc_count);
        return delegstateid(ret);
 }
 
@@ -3844,20 +3988,14 @@ out:
        return status;
 }
 
-void nfsd4_cleanup_open_state(struct nfsd4_open *open, __be32 status)
+void nfsd4_cleanup_open_state(struct nfsd4_compound_state *cstate,
+                             struct nfsd4_open *open, __be32 status)
 {
        if (open->op_openowner) {
-               struct nfs4_openowner *oo = open->op_openowner;
-
-               if (!list_empty(&oo->oo_owner.so_stateids))
-                       list_del_init(&oo->oo_close_lru);
-               if (oo->oo_flags & NFS4_OO_NEW) {
-                       if (status) {
-                               release_openowner(oo);
-                               open->op_openowner = NULL;
-                       } else
-                               oo->oo_flags &= ~NFS4_OO_NEW;
-               }
+               struct nfs4_stateowner *so = &open->op_openowner->oo_owner;
+
+               nfsd4_cstate_assign_replay(cstate, so);
+               nfs4_put_stateowner(so);
        }
        if (open->op_file)
                nfsd4_free_file(open->op_file);
@@ -3973,7 +4111,7 @@ nfs4_laundromat(struct nfsd_net *nn)
                        new_timeo = min(new_timeo, t);
                        break;
                }
-               release_openowner(oo);
+               release_last_closed_stateid(oo);
        }
        new_timeo = max_t(time_t, new_timeo, NFSD_LAUNDROMAT_MINTIMEOUT);
        nfs4_unlock_state();
@@ -4204,7 +4342,7 @@ nfs4_preprocess_stateid_op(struct net *net, struct nfsd4_compound_state *cstate,
                                NFS4_DELEG_STID|NFS4_OPEN_STID|NFS4_LOCK_STID,
                                &s, nn);
        if (status)
-               goto out;
+               goto unlock_state;
        status = check_stateid_generation(stateid, &s->sc_stateid, nfsd4_has_session(cstate));
        if (status)
                goto out;
@@ -4253,6 +4391,8 @@ nfs4_preprocess_stateid_op(struct net *net, struct nfsd4_compound_state *cstate,
        if (file)
                *filpp = file;
 out:
+       nfs4_put_stid(s);
+unlock_state:
        nfs4_unlock_state();
        return status;
 }
@@ -4264,7 +4404,7 @@ nfsd4_free_lock_stateid(struct nfs4_ol_stateid *stp)
 
        if (check_for_locks(stp->st_stid.sc_file, lo))
                return nfserr_locks_held;
-       release_lockowner_if_empty(lo);
+       release_lock_stateid(stp);
        return nfs_ok;
 }
 
@@ -4386,15 +4526,13 @@ nfs4_preprocess_seqid_op(struct nfsd4_compound_state *cstate, u32 seqid,
        if (status)
                return status;
        stp = openlockstateid(s);
-       if (!nfsd4_has_session(cstate))
-               cstate->replay_owner = stp->st_stateowner;
+       nfsd4_cstate_assign_replay(cstate, stp->st_stateowner);
 
        status = nfs4_seqid_op_checks(cstate, stateid, seqid, stp);
-       if (!status) {
-               /* FIXME: move into find_stateid_by_type */
-               atomic_inc(&stp->st_stid.sc_count);
+       if (!status)
                *stpp = stp;
-       }
+       else
+               nfs4_put_stid(&stp->st_stid);
        return status;
 }
 
@@ -4457,8 +4595,7 @@ put_stateid:
        nfs4_put_stid(&stp->st_stid);
 out:
        nfsd4_bump_seqid(cstate, status);
-       if (!cstate->replay_owner)
-               nfs4_unlock_state();
+       nfs4_unlock_state();
        return status;
 }
 
@@ -4532,39 +4669,21 @@ put_stateid:
        nfs4_put_stid(&stp->st_stid);
 out:
        nfsd4_bump_seqid(cstate, status);
-       if (!cstate->replay_owner)
-               nfs4_unlock_state();
+       nfs4_unlock_state();
        return status;
 }
 
 static void nfsd4_close_open_stateid(struct nfs4_ol_stateid *s)
 {
        struct nfs4_client *clp = s->st_stid.sc_client;
-       struct nfs4_openowner *oo = openowner(s->st_stateowner);
 
        s->st_stid.sc_type = NFS4_CLOSED_STID;
        unhash_open_stateid(s);
 
-       if (clp->cl_minorversion) {
-               if (list_empty(&oo->oo_owner.so_stateids))
-                       release_openowner(oo);
+       if (clp->cl_minorversion)
                nfs4_put_stid(&s->st_stid);
-       } else {
-               /*
-                * In the 4.0 case we need to keep the owners around a
-                * little while to handle CLOSE replay. We still do need
-                * to release any file access that is held by them
-                * before returning however.
-                */
-               release_all_access(s);
-               if (s->st_stid.sc_file) {
-                       put_nfs4_file(s->st_stid.sc_file);
-                       s->st_stid.sc_file = NULL;
-               }
-               oo->oo_last_closed_stid = s;
-               if (list_empty(&oo->oo_owner.so_stateids))
-                       move_to_close_lru(oo, clp->net);
-       }
+       else
+               move_to_close_lru(s, clp->net);
 }
 
 /*
@@ -4598,8 +4717,7 @@ nfsd4_close(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
        /* put reference from nfs4_preprocess_seqid_op */
        nfs4_put_stid(&stp->st_stid);
 out:
-       if (!cstate->replay_owner)
-               nfs4_unlock_state();
+       nfs4_unlock_state();
        return status;
 }
 
@@ -4623,9 +4741,11 @@ nfsd4_delegreturn(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
        dp = delegstateid(s);
        status = check_stateid_generation(stateid, &dp->dl_stid.sc_stateid, nfsd4_has_session(cstate));
        if (status)
-               goto out;
+               goto put_stateid;
 
        destroy_delegation(dp);
+put_stateid:
+       nfs4_put_stid(&dp->dl_stid);
 out:
        nfs4_unlock_state();
 
@@ -4708,22 +4828,53 @@ nevermind:
 }
 
 static struct nfs4_lockowner *
-find_lockowner_str(clientid_t *clid, struct xdr_netobj *owner,
-               struct nfsd_net *nn)
+find_lockowner_str_locked(clientid_t *clid, struct xdr_netobj *owner,
+               struct nfs4_client *clp)
 {
-       unsigned int strhashval = ownerstr_hashval(clid->cl_id, owner);
+       unsigned int strhashval = ownerstr_hashval(owner);
        struct nfs4_stateowner *so;
 
-       list_for_each_entry(so, &nn->ownerstr_hashtbl[strhashval], so_strhash) {
+       list_for_each_entry(so, &clp->cl_ownerstr_hashtbl[strhashval],
+                           so_strhash) {
                if (so->so_is_open_owner)
                        continue;
-               if (!same_owner_str(so, owner, clid))
+               if (!same_owner_str(so, owner))
                        continue;
+               atomic_inc(&so->so_count);
                return lockowner(so);
        }
        return NULL;
 }
 
+static struct nfs4_lockowner *
+find_lockowner_str(clientid_t *clid, struct xdr_netobj *owner,
+               struct nfs4_client *clp)
+{
+       struct nfs4_lockowner *lo;
+
+       spin_lock(&clp->cl_lock);
+       lo = find_lockowner_str_locked(clid, owner, clp);
+       spin_unlock(&clp->cl_lock);
+       return lo;
+}
+
+static void nfs4_unhash_lockowner(struct nfs4_stateowner *sop)
+{
+       unhash_lockowner_locked(lockowner(sop));
+}
+
+static void nfs4_free_lockowner(struct nfs4_stateowner *sop)
+{
+       struct nfs4_lockowner *lo = lockowner(sop);
+
+       kmem_cache_free(lockowner_slab, lo);
+}
+
+static const struct nfs4_stateowner_operations lockowner_ops = {
+       .so_unhash =    nfs4_unhash_lockowner,
+       .so_free =      nfs4_free_lockowner,
+};
+
 /*
  * Alloc a lock owner structure.
  * Called in nfsd4_lock - therefore, OPEN and OPEN_CONFIRM (if needed) has 
@@ -4732,19 +4883,29 @@ find_lockowner_str(clientid_t *clid, struct xdr_netobj *owner,
  * strhashval = ownerstr_hashval
  */
 static struct nfs4_lockowner *
-alloc_init_lock_stateowner(unsigned int strhashval, struct nfs4_client *clp, struct nfs4_ol_stateid *open_stp, struct nfsd4_lock *lock) {
-       struct nfs4_lockowner *lo;
-       struct nfsd_net *nn = net_generic(clp->net, nfsd_net_id);
+alloc_init_lock_stateowner(unsigned int strhashval, struct nfs4_client *clp,
+                          struct nfs4_ol_stateid *open_stp,
+                          struct nfsd4_lock *lock)
+{
+       struct nfs4_lockowner *lo, *ret;
 
        lo = alloc_stateowner(lockowner_slab, &lock->lk_new_owner, clp);
        if (!lo)
                return NULL;
        INIT_LIST_HEAD(&lo->lo_owner.so_stateids);
        lo->lo_owner.so_is_open_owner = 0;
-       /* It is the openowner seqid that will be incremented in encode in the
-        * case of new lockowners; so increment the lock seqid manually: */
-       lo->lo_owner.so_seqid = lock->lk_new_lock_seqid + 1;
-       list_add(&lo->lo_owner.so_strhash, &nn->ownerstr_hashtbl[strhashval]);
+       lo->lo_owner.so_seqid = lock->lk_new_lock_seqid;
+       lo->lo_owner.so_ops = &lockowner_ops;
+       spin_lock(&clp->cl_lock);
+       ret = find_lockowner_str_locked(&clp->cl_clientid,
+                       &lock->lk_new_owner, clp);
+       if (ret == NULL) {
+               list_add(&lo->lo_owner.so_strhash,
+                        &clp->cl_ownerstr_hashtbl[strhashval]);
+               ret = lo;
+       } else
+               nfs4_free_lockowner(&lo->lo_owner);
+       spin_unlock(&clp->cl_lock);
        return lo;
 }
 
@@ -4760,6 +4921,7 @@ init_lock_stateid(struct nfs4_ol_stateid *stp, struct nfs4_lockowner *lo,
        atomic_inc(&stp->st_stid.sc_count);
        stp->st_stid.sc_type = NFS4_LOCK_STID;
        stp->st_stateowner = &lo->lo_owner;
+       atomic_inc(&lo->lo_owner.so_count);
        get_nfs4_file(fp);
        stp->st_stid.sc_file = fp;
        stp->st_stid.sc_free = nfs4_free_lock_stateid;
@@ -4848,34 +5010,37 @@ lookup_or_create_lock_state(struct nfsd4_compound_state *cstate,
                            struct nfsd4_lock *lock,
                            struct nfs4_ol_stateid **lst, bool *new)
 {
+       __be32 status;
        struct nfs4_file *fi = ost->st_stid.sc_file;
        struct nfs4_openowner *oo = openowner(ost->st_stateowner);
        struct nfs4_client *cl = oo->oo_owner.so_client;
        struct inode *inode = cstate->current_fh.fh_dentry->d_inode;
        struct nfs4_lockowner *lo;
        unsigned int strhashval;
-       struct nfsd_net *nn = net_generic(cl->net, nfsd_net_id);
 
-       lo = find_lockowner_str(&cl->cl_clientid, &lock->v.new.owner, nn);
+       lo = find_lockowner_str(&cl->cl_clientid, &lock->v.new.owner, cl);
        if (!lo) {
-               strhashval = ownerstr_hashval(cl->cl_clientid.cl_id,
-                               &lock->v.new.owner);
+               strhashval = ownerstr_hashval(&lock->v.new.owner);
                lo = alloc_init_lock_stateowner(strhashval, cl, ost, lock);
                if (lo == NULL)
                        return nfserr_jukebox;
        } else {
                /* with an existing lockowner, seqids must be the same */
+               status = nfserr_bad_seqid;
                if (!cstate->minorversion &&
                    lock->lk_new_lock_seqid != lo->lo_owner.so_seqid)
-                       return nfserr_bad_seqid;
+                       goto out;
        }
 
        *lst = find_or_create_lock_stateid(lo, fi, inode, ost, new);
        if (*lst == NULL) {
-               release_lockowner_if_empty(lo);
-               return nfserr_jukebox;
+               status = nfserr_jukebox;
+               goto out;
        }
-       return nfs_ok;
+       status = nfs_ok;
+out:
+       nfs4_put_stateowner(&lo->lo_owner);
+       return status;
 }
 
 /*
@@ -4894,9 +5059,9 @@ nfsd4_lock(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
        struct file_lock *file_lock = NULL;
        struct file_lock *conflock = NULL;
        __be32 status = 0;
-       bool new_state = false;
        int lkflg;
        int err;
+       bool new = false;
        struct net *net = SVC_NET(rqstp);
        struct nfsd_net *nn = net_generic(net, nfsd_net_id);
 
@@ -4939,7 +5104,7 @@ nfsd4_lock(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
                                                &lock->v.new.clientid))
                        goto out;
                status = lookup_or_create_lock_state(cstate, open_stp, lock,
-                                                       &lock_stp, &new_state);
+                                                       &lock_stp, &new);
        } else {
                status = nfs4_preprocess_seqid_op(cstate,
                                       lock->lk_old_lock_seqid,
@@ -5038,15 +5203,26 @@ nfsd4_lock(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 out:
        if (filp)
                fput(filp);
-       if (lock_stp)
+       if (lock_stp) {
+               /* Bump seqid manually if the 4.0 replay owner is openowner */
+               if (cstate->replay_owner &&
+                   cstate->replay_owner != &lock_sop->lo_owner &&
+                   seqid_mutating_err(ntohl(status)))
+                       lock_sop->lo_owner.so_seqid++;
+
+               /*
+                * If this is a new, never-before-used stateid, and we are
+                * returning an error, then just go ahead and release it.
+                */
+               if (status && new)
+                       release_lock_stateid(lock_stp);
+
                nfs4_put_stid(&lock_stp->st_stid);
+       }
        if (open_stp)
                nfs4_put_stid(&open_stp->st_stid);
-       if (status && new_state)
-               release_lock_stateid(lock_stp);
        nfsd4_bump_seqid(cstate, status);
-       if (!cstate->replay_owner)
-               nfs4_unlock_state();
+       nfs4_unlock_state();
        if (file_lock)
                locks_free_lock(file_lock);
        if (conflock)
@@ -5079,7 +5255,7 @@ nfsd4_lockt(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
            struct nfsd4_lockt *lockt)
 {
        struct file_lock *file_lock = NULL;
-       struct nfs4_lockowner *lo;
+       struct nfs4_lockowner *lo = NULL;
        __be32 status;
        struct nfsd_net *nn = net_generic(SVC_NET(rqstp), nfsd_net_id);
 
@@ -5122,7 +5298,8 @@ nfsd4_lockt(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
                goto out;
        }
 
-       lo = find_lockowner_str(&lockt->lt_clientid, &lockt->lt_owner, nn);
+       lo = find_lockowner_str(&lockt->lt_clientid, &lockt->lt_owner,
+                               cstate->clp);
        if (lo)
                file_lock->fl_owner = (fl_owner_t)lo;
        file_lock->fl_pid = current->tgid;
@@ -5142,6 +5319,8 @@ nfsd4_lockt(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
                nfs4_set_lock_denied(file_lock, &lockt->lt_denied);
        }
 out:
+       if (lo)
+               nfs4_put_stateowner(&lo->lo_owner);
        nfs4_unlock_state();
        if (file_lock)
                locks_free_lock(file_lock);
@@ -5210,8 +5389,7 @@ put_stateid:
        nfs4_put_stid(&stp->st_stid);
 out:
        nfsd4_bump_seqid(cstate, status);
-       if (!cstate->replay_owner)
-               nfs4_unlock_state();
+       nfs4_unlock_state();
        if (file_lock)
                locks_free_lock(file_lock);
        return status;
@@ -5260,13 +5438,14 @@ nfsd4_release_lockowner(struct svc_rqst *rqstp,
                        struct nfsd4_release_lockowner *rlockowner)
 {
        clientid_t *clid = &rlockowner->rl_clientid;
-       struct nfs4_stateowner *sop = NULL, *tmp;
-       struct nfs4_lockowner *lo;
+       struct nfs4_stateowner *sop;
+       struct nfs4_lockowner *lo = NULL;
        struct nfs4_ol_stateid *stp;
        struct xdr_netobj *owner = &rlockowner->rl_owner;
-       unsigned int hashval = ownerstr_hashval(clid->cl_id, owner);
+       unsigned int hashval = ownerstr_hashval(owner);
        __be32 status;
        struct nfsd_net *nn = net_generic(SVC_NET(rqstp), nfsd_net_id);
+       struct nfs4_client *clp;
 
        dprintk("nfsd4_release_lockowner clientid: (%08x/%08x):\n",
                clid->cl_boot, clid->cl_id);
@@ -5277,33 +5456,31 @@ nfsd4_release_lockowner(struct svc_rqst *rqstp,
        if (status)
                goto out;
 
-       status = nfserr_locks_held;
-
+       clp = cstate->clp;
        /* Find the matching lock stateowner */
-       list_for_each_entry(tmp, &nn->ownerstr_hashtbl[hashval], so_strhash) {
-               if (tmp->so_is_open_owner)
+       spin_lock(&clp->cl_lock);
+       list_for_each_entry(sop, &clp->cl_ownerstr_hashtbl[hashval],
+                           so_strhash) {
+
+               if (sop->so_is_open_owner || !same_owner_str(sop, owner))
                        continue;
-               if (same_owner_str(tmp, owner, clid)) {
-                       sop = tmp;
-                       break;
-               }
-       }
 
-       /* No matching owner found, maybe a replay? Just declare victory... */
-       if (!sop) {
-               status = nfs_ok;
-               goto out;
-       }
+               /* see if there are still any locks associated with it */
+               lo = lockowner(sop);
+               list_for_each_entry(stp, &sop->so_stateids, st_perstateowner) {
+                       if (check_for_locks(stp->st_stid.sc_file, lo)) {
+                               status = nfserr_locks_held;
+                               spin_unlock(&clp->cl_lock);
+                               goto out;
+                       }
+               }
 
-       lo = lockowner(sop);
-       /* see if there are still any locks associated with it */
-       list_for_each_entry(stp, &sop->so_stateids, st_perstateowner) {
-               if (check_for_locks(stp->st_stid.sc_file, lo))
-                       goto out;
+               atomic_inc(&sop->so_count);
+               break;
        }
-
-       status = nfs_ok;
-       release_lockowner(lo);
+       spin_unlock(&clp->cl_lock);
+       if (lo)
+               release_lockowner(lo);
 out:
        nfs4_unlock_state();
        return status;
@@ -5645,10 +5822,6 @@ static int nfs4_state_create_net(struct net *net)
                        CLIENT_HASH_SIZE, GFP_KERNEL);
        if (!nn->unconf_id_hashtbl)
                goto err_unconf_id;
-       nn->ownerstr_hashtbl = kmalloc(sizeof(struct list_head) *
-                       OWNER_HASH_SIZE, GFP_KERNEL);
-       if (!nn->ownerstr_hashtbl)
-               goto err_ownerstr;
        nn->sessionid_hashtbl = kmalloc(sizeof(struct list_head) *
                        SESSION_HASH_SIZE, GFP_KERNEL);
        if (!nn->sessionid_hashtbl)
@@ -5658,8 +5831,6 @@ static int nfs4_state_create_net(struct net *net)
                INIT_LIST_HEAD(&nn->conf_id_hashtbl[i]);
                INIT_LIST_HEAD(&nn->unconf_id_hashtbl[i]);
        }
-       for (i = 0; i < OWNER_HASH_SIZE; i++)
-               INIT_LIST_HEAD(&nn->ownerstr_hashtbl[i]);
        for (i = 0; i < SESSION_HASH_SIZE; i++)
                INIT_LIST_HEAD(&nn->sessionid_hashtbl[i]);
        nn->conf_name_tree = RB_ROOT;
@@ -5675,8 +5846,6 @@ static int nfs4_state_create_net(struct net *net)
        return 0;
 
 err_sessionid:
-       kfree(nn->ownerstr_hashtbl);
-err_ownerstr:
        kfree(nn->unconf_id_hashtbl);
 err_unconf_id:
        kfree(nn->conf_id_hashtbl);
@@ -5706,7 +5875,6 @@ nfs4_state_destroy_net(struct net *net)
        }
 
        kfree(nn->sessionid_hashtbl);
-       kfree(nn->ownerstr_hashtbl);
        kfree(nn->unconf_id_hashtbl);
        kfree(nn->conf_id_hashtbl);
        put_net(net);