Merge branch 'sched-urgent-for-linus' of git://git.kernel.org/pub/scm/linux/kernel...
[firefly-linux-kernel-4.4.55.git] / fs / nfs / pnfs.c
index 0a5dda4d85c27b85437d4e7f427e4cb0af7d7526..4f802b02fbb9b0c6fc61912be044f6cef94acd32 100644 (file)
@@ -34,6 +34,7 @@
 #include "pnfs.h"
 #include "iostat.h"
 #include "nfs4trace.h"
+#include "delegation.h"
 
 #define NFSDBG_FACILITY                NFSDBG_PNFS
 #define PNFS_LAYOUTGET_RETRY_TIMEOUT (120*HZ)
@@ -50,6 +51,10 @@ static DEFINE_SPINLOCK(pnfs_spinlock);
  */
 static LIST_HEAD(pnfs_modules_tbl);
 
+static int
+pnfs_send_layoutreturn(struct pnfs_layout_hdr *lo, nfs4_stateid stateid,
+                      enum pnfs_iomode iomode, bool sync);
+
 /* Return the registered pnfs layout driver module matching given id */
 static struct pnfs_layoutdriver_type *
 find_pnfs_driver_locked(u32 id)
@@ -238,6 +243,8 @@ pnfs_put_layout_hdr(struct pnfs_layout_hdr *lo)
        struct inode *inode = lo->plh_inode;
 
        if (atomic_dec_and_lock(&lo->plh_refcount, &inode->i_lock)) {
+               if (!list_empty(&lo->plh_segs))
+                       WARN_ONCE(1, "NFS: BUG unfreed layout segments.\n");
                pnfs_detach_layout_hdr(lo);
                spin_unlock(&inode->i_lock);
                pnfs_free_layout_hdr(lo);
@@ -337,6 +344,48 @@ pnfs_layout_remove_lseg(struct pnfs_layout_hdr *lo,
        rpc_wake_up(&NFS_SERVER(inode)->roc_rpcwaitq);
 }
 
+/* Return true if layoutreturn is needed */
+static bool
+pnfs_layout_need_return(struct pnfs_layout_hdr *lo,
+                       struct pnfs_layout_segment *lseg)
+{
+       struct pnfs_layout_segment *s;
+
+       if (!test_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags))
+               return false;
+
+       list_for_each_entry(s, &lo->plh_segs, pls_list)
+               if (s != lseg && test_bit(NFS_LSEG_LAYOUTRETURN, &s->pls_flags))
+                       return false;
+
+       return true;
+}
+
+static void pnfs_layoutreturn_before_put_lseg(struct pnfs_layout_segment *lseg,
+               struct pnfs_layout_hdr *lo, struct inode *inode)
+{
+       lo = lseg->pls_layout;
+       inode = lo->plh_inode;
+
+       spin_lock(&inode->i_lock);
+       if (pnfs_layout_need_return(lo, lseg)) {
+               nfs4_stateid stateid;
+               enum pnfs_iomode iomode;
+
+               stateid = lo->plh_stateid;
+               iomode = lo->plh_return_iomode;
+               /* decreased in pnfs_send_layoutreturn() */
+               lo->plh_block_lgets++;
+               lo->plh_return_iomode = 0;
+               spin_unlock(&inode->i_lock);
+               pnfs_get_layout_hdr(lo);
+
+               /* Send an async layoutreturn so we dont deadlock */
+               pnfs_send_layoutreturn(lo, stateid, iomode, false);
+       } else
+               spin_unlock(&inode->i_lock);
+}
+
 void
 pnfs_put_lseg(struct pnfs_layout_segment *lseg)
 {
@@ -349,8 +398,17 @@ pnfs_put_lseg(struct pnfs_layout_segment *lseg)
        dprintk("%s: lseg %p ref %d valid %d\n", __func__, lseg,
                atomic_read(&lseg->pls_refcount),
                test_bit(NFS_LSEG_VALID, &lseg->pls_flags));
+
+       /* Handle the case where refcount != 1 */
+       if (atomic_add_unless(&lseg->pls_refcount, -1, 1))
+               return;
+
        lo = lseg->pls_layout;
        inode = lo->plh_inode;
+       /* Do we need a layoutreturn? */
+       if (test_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags))
+               pnfs_layoutreturn_before_put_lseg(lseg, lo, inode);
+
        if (atomic_dec_and_lock(&lseg->pls_refcount, &inode->i_lock)) {
                pnfs_get_layout_hdr(lo);
                pnfs_layout_remove_lseg(lo, lseg);
@@ -543,6 +601,7 @@ pnfs_destroy_layout(struct nfs_inode *nfsi)
                pnfs_get_layout_hdr(lo);
                pnfs_layout_clear_fail_bit(lo, NFS_LAYOUT_RO_FAILED);
                pnfs_layout_clear_fail_bit(lo, NFS_LAYOUT_RW_FAILED);
+               pnfs_clear_retry_layoutget(lo);
                spin_unlock(&nfsi->vfs_inode.i_lock);
                pnfs_free_lseg_list(&tmp_list);
                pnfs_put_layout_hdr(lo);
@@ -740,25 +799,37 @@ pnfs_layout_stateid_blocked(const struct pnfs_layout_hdr *lo,
        return !pnfs_seqid_is_newer(seqid, lo->plh_barrier);
 }
 
+static bool
+pnfs_layout_returning(const struct pnfs_layout_hdr *lo,
+                     struct pnfs_layout_range *range)
+{
+       return test_bit(NFS_LAYOUT_RETURN, &lo->plh_flags) &&
+               (lo->plh_return_iomode == IOMODE_ANY ||
+                lo->plh_return_iomode == range->iomode);
+}
+
 /* lget is set to 1 if called from inside send_layoutget call chain */
 static bool
-pnfs_layoutgets_blocked(const struct pnfs_layout_hdr *lo, int lget)
+pnfs_layoutgets_blocked(const struct pnfs_layout_hdr *lo,
+                       struct pnfs_layout_range *range, int lget)
 {
        return lo->plh_block_lgets ||
                test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags) ||
                (list_empty(&lo->plh_segs) &&
-                (atomic_read(&lo->plh_outstanding) > lget));
+                (atomic_read(&lo->plh_outstanding) > lget)) ||
+               pnfs_layout_returning(lo, range);
 }
 
 int
 pnfs_choose_layoutget_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo,
+                             struct pnfs_layout_range *range,
                              struct nfs4_state *open_state)
 {
        int status = 0;
 
        dprintk("--> %s\n", __func__);
        spin_lock(&lo->plh_inode->i_lock);
-       if (pnfs_layoutgets_blocked(lo, 1)) {
+       if (pnfs_layoutgets_blocked(lo, range, 1)) {
                status = -EAGAIN;
        } else if (!nfs4_valid_open_stateid(open_state)) {
                status = -EBADF;
@@ -825,7 +896,9 @@ send_layoutget(struct pnfs_layout_hdr *lo,
                        pnfs_layout_io_set_failed(lo, range->iomode);
                }
                return NULL;
-       }
+       } else
+               pnfs_layout_clear_fail_bit(lo,
+                               pnfs_iomode_to_fail_bit(range->iomode));
 
        return lseg;
 }
@@ -845,6 +918,49 @@ static void pnfs_clear_layoutcommit(struct inode *inode,
        }
 }
 
+void pnfs_clear_layoutreturn_waitbit(struct pnfs_layout_hdr *lo)
+{
+       clear_bit_unlock(NFS_LAYOUT_RETURN, &lo->plh_flags);
+       smp_mb__after_atomic();
+       wake_up_bit(&lo->plh_flags, NFS_LAYOUT_RETURN);
+}
+
+static int
+pnfs_send_layoutreturn(struct pnfs_layout_hdr *lo, nfs4_stateid stateid,
+                      enum pnfs_iomode iomode, bool sync)
+{
+       struct inode *ino = lo->plh_inode;
+       struct nfs4_layoutreturn *lrp;
+       int status = 0;
+
+       lrp = kzalloc(sizeof(*lrp), GFP_NOFS);
+       if (unlikely(lrp == NULL)) {
+               status = -ENOMEM;
+               spin_lock(&ino->i_lock);
+               lo->plh_block_lgets--;
+               pnfs_clear_layoutreturn_waitbit(lo);
+               rpc_wake_up(&NFS_SERVER(ino)->roc_rpcwaitq);
+               spin_unlock(&ino->i_lock);
+               pnfs_put_layout_hdr(lo);
+               goto out;
+       }
+
+       lrp->args.stateid = stateid;
+       lrp->args.layout_type = NFS_SERVER(ino)->pnfs_curr_ld->id;
+       lrp->args.inode = ino;
+       lrp->args.range.iomode = iomode;
+       lrp->args.range.offset = 0;
+       lrp->args.range.length = NFS4_MAX_UINT64;
+       lrp->args.layout = lo;
+       lrp->clp = NFS_SERVER(ino)->nfs_client;
+       lrp->cred = lo->plh_lc_cred;
+
+       status = nfs4_proc_layoutreturn(lrp, sync);
+out:
+       dprintk("<-- %s status: %d\n", __func__, status);
+       return status;
+}
+
 /*
  * Initiates a LAYOUTRETURN(FILE), and removes the pnfs_layout_hdr
  * when the layout segment list is empty.
@@ -859,7 +975,6 @@ _pnfs_return_layout(struct inode *ino)
        struct pnfs_layout_hdr *lo = NULL;
        struct nfs_inode *nfsi = NFS_I(ino);
        LIST_HEAD(tmp_list);
-       struct nfs4_layoutreturn *lrp;
        nfs4_stateid stateid;
        int status = 0, empty;
 
@@ -901,24 +1016,7 @@ _pnfs_return_layout(struct inode *ino)
        spin_unlock(&ino->i_lock);
        pnfs_free_lseg_list(&tmp_list);
 
-       lrp = kzalloc(sizeof(*lrp), GFP_KERNEL);
-       if (unlikely(lrp == NULL)) {
-               status = -ENOMEM;
-               spin_lock(&ino->i_lock);
-               lo->plh_block_lgets--;
-               spin_unlock(&ino->i_lock);
-               pnfs_put_layout_hdr(lo);
-               goto out;
-       }
-
-       lrp->args.stateid = stateid;
-       lrp->args.layout_type = NFS_SERVER(ino)->pnfs_curr_ld->id;
-       lrp->args.inode = ino;
-       lrp->args.layout = lo;
-       lrp->clp = NFS_SERVER(ino)->nfs_client;
-       lrp->cred = lo->plh_lc_cred;
-
-       status = nfs4_proc_layoutreturn(lrp);
+       status = pnfs_send_layoutreturn(lo, stateid, IOMODE_ANY, true);
 out:
        dprintk("<-- %s status: %d\n", __func__, status);
        return status;
@@ -954,31 +1052,60 @@ pnfs_commit_and_return_layout(struct inode *inode)
 
 bool pnfs_roc(struct inode *ino)
 {
+       struct nfs_inode *nfsi = NFS_I(ino);
+       struct nfs_open_context *ctx;
+       struct nfs4_state *state;
        struct pnfs_layout_hdr *lo;
        struct pnfs_layout_segment *lseg, *tmp;
+       nfs4_stateid stateid;
        LIST_HEAD(tmp_list);
-       bool found = false;
+       bool found = false, layoutreturn = false;
 
        spin_lock(&ino->i_lock);
-       lo = NFS_I(ino)->layout;
+       lo = nfsi->layout;
        if (!lo || !test_and_clear_bit(NFS_LAYOUT_ROC, &lo->plh_flags) ||
            test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags))
-               goto out_nolayout;
+               goto out_noroc;
+
+       /* Don't return layout if we hold a delegation */
+       if (nfs4_check_delegation(ino, FMODE_READ))
+               goto out_noroc;
+
+       list_for_each_entry(ctx, &nfsi->open_files, list) {
+               state = ctx->state;
+               /* Don't return layout if there is open file state */
+               if (state != NULL && state->state != 0)
+                       goto out_noroc;
+       }
+
+       pnfs_clear_retry_layoutget(lo);
        list_for_each_entry_safe(lseg, tmp, &lo->plh_segs, pls_list)
                if (test_bit(NFS_LSEG_ROC, &lseg->pls_flags)) {
                        mark_lseg_invalid(lseg, &tmp_list);
                        found = true;
                }
        if (!found)
-               goto out_nolayout;
+               goto out_noroc;
        lo->plh_block_lgets++;
        pnfs_get_layout_hdr(lo); /* matched in pnfs_roc_release */
        spin_unlock(&ino->i_lock);
        pnfs_free_lseg_list(&tmp_list);
        return true;
 
-out_nolayout:
+out_noroc:
+       if (lo) {
+               stateid = lo->plh_stateid;
+               layoutreturn =
+                       test_and_clear_bit(NFS_LAYOUT_RETURN_BEFORE_CLOSE,
+                                          &lo->plh_flags);
+               if (layoutreturn) {
+                       lo->plh_block_lgets++;
+                       pnfs_get_layout_hdr(lo);
+               }
+       }
        spin_unlock(&ino->i_lock);
+       if (layoutreturn)
+               pnfs_send_layoutreturn(lo, stateid, IOMODE_ANY, true);
        return false;
 }
 
@@ -1013,8 +1140,9 @@ bool pnfs_roc_drain(struct inode *ino, u32 *barrier, struct rpc_task *task)
        struct nfs_inode *nfsi = NFS_I(ino);
        struct pnfs_layout_hdr *lo;
        struct pnfs_layout_segment *lseg;
+       nfs4_stateid stateid;
        u32 current_seqid;
-       bool found = false;
+       bool found = false, layoutreturn = false;
 
        spin_lock(&ino->i_lock);
        list_for_each_entry(lseg, &nfsi->layout->plh_segs, pls_list)
@@ -1031,7 +1159,21 @@ bool pnfs_roc_drain(struct inode *ino, u32 *barrier, struct rpc_task *task)
         */
        *barrier = current_seqid + atomic_read(&lo->plh_outstanding);
 out:
+       if (!found) {
+               stateid = lo->plh_stateid;
+               layoutreturn =
+                       test_and_clear_bit(NFS_LAYOUT_RETURN_BEFORE_CLOSE,
+                                          &lo->plh_flags);
+               if (layoutreturn) {
+                       lo->plh_block_lgets++;
+                       pnfs_get_layout_hdr(lo);
+               }
+       }
        spin_unlock(&ino->i_lock);
+       if (layoutreturn) {
+               rpc_sleep_on(&NFS_SERVER(ino)->roc_rpcwaitq, task, NULL);
+               pnfs_send_layoutreturn(lo, stateid, IOMODE_ANY, false);
+       }
        return found;
 }
 
@@ -1178,6 +1320,7 @@ pnfs_find_lseg(struct pnfs_layout_hdr *lo,
 
        list_for_each_entry(lseg, &lo->plh_segs, pls_list) {
                if (test_bit(NFS_LSEG_VALID, &lseg->pls_flags) &&
+                   !test_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags) &&
                    pnfs_lseg_range_match(&lseg->pls_range, range)) {
                        ret = pnfs_get_lseg(lseg);
                        break;
@@ -1266,6 +1409,35 @@ static bool pnfs_within_mdsthreshold(struct nfs_open_context *ctx,
        return ret;
 }
 
+/* stop waiting if someone clears NFS_LAYOUT_RETRY_LAYOUTGET bit. */
+static int pnfs_layoutget_retry_bit_wait(struct wait_bit_key *key)
+{
+       if (!test_bit(NFS_LAYOUT_RETRY_LAYOUTGET, key->flags))
+               return 1;
+       return nfs_wait_bit_killable(key);
+}
+
+static bool pnfs_prepare_to_retry_layoutget(struct pnfs_layout_hdr *lo)
+{
+       /*
+        * send layoutcommit as it can hold up layoutreturn due to lseg
+        * reference
+        */
+       pnfs_layoutcommit_inode(lo->plh_inode, false);
+       return !wait_on_bit_action(&lo->plh_flags, NFS_LAYOUT_RETURN,
+                                  pnfs_layoutget_retry_bit_wait,
+                                  TASK_UNINTERRUPTIBLE);
+}
+
+static void pnfs_clear_first_layoutget(struct pnfs_layout_hdr *lo)
+{
+       unsigned long *bitlock = &lo->plh_flags;
+
+       clear_bit_unlock(NFS_LAYOUT_FIRST_LAYOUTGET, bitlock);
+       smp_mb__after_atomic();
+       wake_up_bit(bitlock, NFS_LAYOUT_FIRST_LAYOUTGET);
+}
+
 /*
  * Layout segment is retreived from the server if not cached.
  * The appropriate layout segment is referenced and returned to the caller.
@@ -1296,6 +1468,8 @@ pnfs_update_layout(struct inode *ino,
        if (pnfs_within_mdsthreshold(ctx, ino, iomode))
                goto out;
 
+lookup_again:
+       first = false;
        spin_lock(&ino->i_lock);
        lo = pnfs_find_alloc_layout(ino, ctx, gfp_flags);
        if (lo == NULL) {
@@ -1310,27 +1484,62 @@ pnfs_update_layout(struct inode *ino,
        }
 
        /* if LAYOUTGET already failed once we don't try again */
-       if (pnfs_layout_io_test_failed(lo, iomode))
+       if (pnfs_layout_io_test_failed(lo, iomode) &&
+           !pnfs_should_retry_layoutget(lo))
                goto out_unlock;
 
-       /* Check to see if the layout for the given range already exists */
-       lseg = pnfs_find_lseg(lo, &arg);
-       if (lseg)
-               goto out_unlock;
+       first = list_empty(&lo->plh_segs);
+       if (first) {
+               /* The first layoutget for the file. Need to serialize per
+                * RFC 5661 Errata 3208.
+                */
+               if (test_and_set_bit(NFS_LAYOUT_FIRST_LAYOUTGET,
+                                    &lo->plh_flags)) {
+                       spin_unlock(&ino->i_lock);
+                       wait_on_bit(&lo->plh_flags, NFS_LAYOUT_FIRST_LAYOUTGET,
+                                   TASK_UNINTERRUPTIBLE);
+                       pnfs_put_layout_hdr(lo);
+                       goto lookup_again;
+               }
+       } else {
+               /* Check to see if the layout for the given range
+                * already exists
+                */
+               lseg = pnfs_find_lseg(lo, &arg);
+               if (lseg)
+                       goto out_unlock;
+       }
+
+       /*
+        * Because we free lsegs before sending LAYOUTRETURN, we need to wait
+        * for LAYOUTRETURN even if first is true.
+        */
+       if (!lseg && pnfs_should_retry_layoutget(lo) &&
+           test_bit(NFS_LAYOUT_RETURN, &lo->plh_flags)) {
+               spin_unlock(&ino->i_lock);
+               dprintk("%s wait for layoutreturn\n", __func__);
+               if (pnfs_prepare_to_retry_layoutget(lo)) {
+                       if (first)
+                               pnfs_clear_first_layoutget(lo);
+                       pnfs_put_layout_hdr(lo);
+                       dprintk("%s retrying\n", __func__);
+                       goto lookup_again;
+               }
+               goto out_put_layout_hdr;
+       }
 
-       if (pnfs_layoutgets_blocked(lo, 0))
+       if (pnfs_layoutgets_blocked(lo, &arg, 0))
                goto out_unlock;
        atomic_inc(&lo->plh_outstanding);
-
-       first = list_empty(&lo->plh_layouts) ? true : false;
        spin_unlock(&ino->i_lock);
 
-       if (first) {
+       if (list_empty(&lo->plh_layouts)) {
                /* The lo must be on the clp list if there is any
                 * chance of a CB_LAYOUTRECALL(FILE) coming in.
                 */
                spin_lock(&clp->cl_lock);
-               list_add_tail(&lo->plh_layouts, &server->layouts);
+               if (list_empty(&lo->plh_layouts))
+                       list_add_tail(&lo->plh_layouts, &server->layouts);
                spin_unlock(&clp->cl_lock);
        }
 
@@ -1343,8 +1552,11 @@ pnfs_update_layout(struct inode *ino,
                arg.length = PAGE_CACHE_ALIGN(arg.length);
 
        lseg = send_layoutget(lo, ctx, &arg, gfp_flags);
+       pnfs_clear_retry_layoutget(lo);
        atomic_dec(&lo->plh_outstanding);
 out_put_layout_hdr:
+       if (first)
+               pnfs_clear_first_layoutget(lo);
        pnfs_put_layout_hdr(lo);
 out:
        dprintk("%s: inode %s/%llu pNFS layout segment %s for "
@@ -1393,7 +1605,7 @@ pnfs_layout_process(struct nfs4_layoutget *lgp)
                goto out_forget_reply;
        }
 
-       if (pnfs_layoutgets_blocked(lo, 1)) {
+       if (pnfs_layoutgets_blocked(lo, &lgp->args.range, 1)) {
                dprintk("%s forget reply due to state\n", __func__);
                goto out_forget_reply;
        }
@@ -1440,24 +1652,79 @@ out_forget_reply:
        goto out;
 }
 
+static void
+pnfs_mark_matching_lsegs_return(struct pnfs_layout_hdr *lo,
+                               struct list_head *tmp_list,
+                               struct pnfs_layout_range *return_range)
+{
+       struct pnfs_layout_segment *lseg, *next;
+
+       dprintk("%s:Begin lo %p\n", __func__, lo);
+
+       if (list_empty(&lo->plh_segs))
+               return;
+
+       list_for_each_entry_safe(lseg, next, &lo->plh_segs, pls_list)
+               if (should_free_lseg(&lseg->pls_range, return_range)) {
+                       dprintk("%s: marking lseg %p iomode %d "
+                               "offset %llu length %llu\n", __func__,
+                               lseg, lseg->pls_range.iomode,
+                               lseg->pls_range.offset,
+                               lseg->pls_range.length);
+                       set_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags);
+                       mark_lseg_invalid(lseg, tmp_list);
+               }
+}
+
+void pnfs_error_mark_layout_for_return(struct inode *inode,
+                                      struct pnfs_layout_segment *lseg)
+{
+       struct pnfs_layout_hdr *lo = NFS_I(inode)->layout;
+       int iomode = pnfs_iomode_to_fail_bit(lseg->pls_range.iomode);
+       struct pnfs_layout_range range = {
+               .iomode = lseg->pls_range.iomode,
+               .offset = 0,
+               .length = NFS4_MAX_UINT64,
+       };
+       LIST_HEAD(free_me);
+
+       spin_lock(&inode->i_lock);
+       /* set failure bit so that pnfs path will be retried later */
+       pnfs_layout_set_fail_bit(lo, iomode);
+       set_bit(NFS_LAYOUT_RETURN, &lo->plh_flags);
+       if (lo->plh_return_iomode == 0)
+               lo->plh_return_iomode = range.iomode;
+       else if (lo->plh_return_iomode != range.iomode)
+               lo->plh_return_iomode = IOMODE_ANY;
+       /*
+        * mark all matching lsegs so that we are sure to have no live
+        * segments at hand when sending layoutreturn. See pnfs_put_lseg()
+        * for how it works.
+        */
+       pnfs_mark_matching_lsegs_return(lo, &free_me, &range);
+       spin_unlock(&inode->i_lock);
+       pnfs_free_lseg_list(&free_me);
+}
+EXPORT_SYMBOL_GPL(pnfs_error_mark_layout_for_return);
+
 void
 pnfs_generic_pg_init_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *req)
 {
        u64 rd_size = req->wb_bytes;
 
-       WARN_ON_ONCE(pgio->pg_lseg != NULL);
-
-       if (pgio->pg_dreq == NULL)
-               rd_size = i_size_read(pgio->pg_inode) - req_offset(req);
-       else
-               rd_size = nfs_dreq_bytes_left(pgio->pg_dreq);
-
-       pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
-                                          req->wb_context,
-                                          req_offset(req),
-                                          rd_size,
-                                          IOMODE_READ,
-                                          GFP_KERNEL);
+       if (pgio->pg_lseg == NULL) {
+               if (pgio->pg_dreq == NULL)
+                       rd_size = i_size_read(pgio->pg_inode) - req_offset(req);
+               else
+                       rd_size = nfs_dreq_bytes_left(pgio->pg_dreq);
+
+               pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
+                                                  req->wb_context,
+                                                  req_offset(req),
+                                                  rd_size,
+                                                  IOMODE_READ,
+                                                  GFP_KERNEL);
+       }
        /* If no lseg, fall back to read through mds */
        if (pgio->pg_lseg == NULL)
                nfs_pageio_reset_read_mds(pgio);
@@ -1469,27 +1736,36 @@ void
 pnfs_generic_pg_init_write(struct nfs_pageio_descriptor *pgio,
                           struct nfs_page *req, u64 wb_size)
 {
-       WARN_ON_ONCE(pgio->pg_lseg != NULL);
-
-       pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
-                                          req->wb_context,
-                                          req_offset(req),
-                                          wb_size,
-                                          IOMODE_RW,
-                                          GFP_NOFS);
+       if (pgio->pg_lseg == NULL)
+               pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
+                                                  req->wb_context,
+                                                  req_offset(req),
+                                                  wb_size,
+                                                  IOMODE_RW,
+                                                  GFP_NOFS);
        /* If no lseg, fall back to write through mds */
        if (pgio->pg_lseg == NULL)
                nfs_pageio_reset_write_mds(pgio);
 }
 EXPORT_SYMBOL_GPL(pnfs_generic_pg_init_write);
 
+void
+pnfs_generic_pg_cleanup(struct nfs_pageio_descriptor *desc)
+{
+       if (desc->pg_lseg) {
+               pnfs_put_lseg(desc->pg_lseg);
+               desc->pg_lseg = NULL;
+       }
+}
+EXPORT_SYMBOL_GPL(pnfs_generic_pg_cleanup);
+
 /*
  * Return 0 if @req cannot be coalesced into @pgio, otherwise return the number
  * of bytes (maximum @req->wb_bytes) that can be coalesced.
  */
 size_t
-pnfs_generic_pg_test(struct nfs_pageio_descriptor *pgio, struct nfs_page *prev,
-                    struct nfs_page *req)
+pnfs_generic_pg_test(struct nfs_pageio_descriptor *pgio,
+                    struct nfs_page *prev, struct nfs_page *req)
 {
        unsigned int size;
        u64 seg_end, req_start, seg_left;
@@ -1513,10 +1789,16 @@ pnfs_generic_pg_test(struct nfs_pageio_descriptor *pgio, struct nfs_page *prev,
                seg_end = end_offset(pgio->pg_lseg->pls_range.offset,
                                     pgio->pg_lseg->pls_range.length);
                req_start = req_offset(req);
-               WARN_ON_ONCE(req_start > seg_end);
+               WARN_ON_ONCE(req_start >= seg_end);
                /* start of request is past the last byte of this segment */
-               if (req_start >= seg_end)
+               if (req_start >= seg_end) {
+                       /* reference the new lseg */
+                       if (pgio->pg_ops->pg_cleanup)
+                               pgio->pg_ops->pg_cleanup(pgio);
+                       if (pgio->pg_ops->pg_init)
+                               pgio->pg_ops->pg_init(pgio, req);
                        return 0;
+               }
 
                /* adjust 'size' iff there are fewer bytes left in the
                 * segment than what nfs_generic_pg_test returned */
@@ -1571,10 +1853,12 @@ static void
 pnfs_write_through_mds(struct nfs_pageio_descriptor *desc,
                struct nfs_pgio_header *hdr)
 {
+       struct nfs_pgio_mirror *mirror = nfs_pgio_current_mirror(desc);
+
        if (!test_and_set_bit(NFS_IOHDR_REDO, &hdr->flags)) {
-               list_splice_tail_init(&hdr->pages, &desc->pg_list);
+               list_splice_tail_init(&hdr->pages, &mirror->pg_list);
                nfs_pageio_reset_write_mds(desc);
-               desc->pg_recoalesce = 1;
+               mirror->pg_recoalesce = 1;
        }
        nfs_pgio_data_destroy(hdr);
 }
@@ -1608,11 +1892,9 @@ pnfs_do_write(struct nfs_pageio_descriptor *desc,
        struct pnfs_layout_segment *lseg = desc->pg_lseg;
        enum pnfs_try_status trypnfs;
 
-       desc->pg_lseg = NULL;
        trypnfs = pnfs_try_to_write_data(hdr, call_ops, lseg, how);
        if (trypnfs == PNFS_NOT_ATTEMPTED)
                pnfs_write_through_mds(desc, hdr);
-       pnfs_put_lseg(lseg);
 }
 
 static void pnfs_writehdr_free(struct nfs_pgio_header *hdr)
@@ -1625,24 +1907,23 @@ EXPORT_SYMBOL_GPL(pnfs_writehdr_free);
 int
 pnfs_generic_pg_writepages(struct nfs_pageio_descriptor *desc)
 {
+       struct nfs_pgio_mirror *mirror = nfs_pgio_current_mirror(desc);
+
        struct nfs_pgio_header *hdr;
        int ret;
 
        hdr = nfs_pgio_header_alloc(desc->pg_rw_ops);
        if (!hdr) {
-               desc->pg_completion_ops->error_cleanup(&desc->pg_list);
-               pnfs_put_lseg(desc->pg_lseg);
-               desc->pg_lseg = NULL;
+               desc->pg_completion_ops->error_cleanup(&mirror->pg_list);
                return -ENOMEM;
        }
        nfs_pgheader_init(desc, hdr, pnfs_writehdr_free);
+
        hdr->lseg = pnfs_get_lseg(desc->pg_lseg);
        ret = nfs_generic_pgio(desc, hdr);
-       if (ret != 0) {
-               pnfs_put_lseg(desc->pg_lseg);
-               desc->pg_lseg = NULL;
-       } else
+       if (!ret)
                pnfs_do_write(desc, hdr, desc->pg_ioflags);
+
        return ret;
 }
 EXPORT_SYMBOL_GPL(pnfs_generic_pg_writepages);
@@ -1687,10 +1968,12 @@ static void
 pnfs_read_through_mds(struct nfs_pageio_descriptor *desc,
                struct nfs_pgio_header *hdr)
 {
+       struct nfs_pgio_mirror *mirror = nfs_pgio_current_mirror(desc);
+
        if (!test_and_set_bit(NFS_IOHDR_REDO, &hdr->flags)) {
-               list_splice_tail_init(&hdr->pages, &desc->pg_list);
+               list_splice_tail_init(&hdr->pages, &mirror->pg_list);
                nfs_pageio_reset_read_mds(desc);
-               desc->pg_recoalesce = 1;
+               mirror->pg_recoalesce = 1;
        }
        nfs_pgio_data_destroy(hdr);
 }
@@ -1719,18 +2002,29 @@ pnfs_try_to_read_data(struct nfs_pgio_header *hdr,
        return trypnfs;
 }
 
+/* Resend all requests through pnfs. */
+int pnfs_read_resend_pnfs(struct nfs_pgio_header *hdr)
+{
+       struct nfs_pageio_descriptor pgio;
+
+       nfs_pageio_init_read(&pgio, hdr->inode, false, hdr->completion_ops);
+       return nfs_pageio_resend(&pgio, hdr);
+}
+EXPORT_SYMBOL_GPL(pnfs_read_resend_pnfs);
+
 static void
 pnfs_do_read(struct nfs_pageio_descriptor *desc, struct nfs_pgio_header *hdr)
 {
        const struct rpc_call_ops *call_ops = desc->pg_rpc_callops;
        struct pnfs_layout_segment *lseg = desc->pg_lseg;
        enum pnfs_try_status trypnfs;
+       int err = 0;
 
-       desc->pg_lseg = NULL;
        trypnfs = pnfs_try_to_read_data(hdr, call_ops, lseg);
-       if (trypnfs == PNFS_NOT_ATTEMPTED)
+       if (trypnfs == PNFS_TRY_AGAIN)
+               err = pnfs_read_resend_pnfs(hdr);
+       if (trypnfs == PNFS_NOT_ATTEMPTED || err)
                pnfs_read_through_mds(desc, hdr);
-       pnfs_put_lseg(lseg);
 }
 
 static void pnfs_readhdr_free(struct nfs_pgio_header *hdr)
@@ -1743,24 +2037,20 @@ EXPORT_SYMBOL_GPL(pnfs_readhdr_free);
 int
 pnfs_generic_pg_readpages(struct nfs_pageio_descriptor *desc)
 {
+       struct nfs_pgio_mirror *mirror = nfs_pgio_current_mirror(desc);
+
        struct nfs_pgio_header *hdr;
        int ret;
 
        hdr = nfs_pgio_header_alloc(desc->pg_rw_ops);
        if (!hdr) {
-               desc->pg_completion_ops->error_cleanup(&desc->pg_list);
-               ret = -ENOMEM;
-               pnfs_put_lseg(desc->pg_lseg);
-               desc->pg_lseg = NULL;
-               return ret;
+               desc->pg_completion_ops->error_cleanup(&mirror->pg_list);
+               return -ENOMEM;
        }
        nfs_pgheader_init(desc, hdr, pnfs_readhdr_free);
        hdr->lseg = pnfs_get_lseg(desc->pg_lseg);
        ret = nfs_generic_pgio(desc, hdr);
-       if (ret != 0) {
-               pnfs_put_lseg(desc->pg_lseg);
-               desc->pg_lseg = NULL;
-       } else
+       if (!ret)
                pnfs_do_read(desc, hdr);
        return ret;
 }
@@ -1966,6 +2256,7 @@ clear_layoutcommitting:
        pnfs_clear_layoutcommitting(inode);
        goto out;
 }
+EXPORT_SYMBOL_GPL(pnfs_layoutcommit_inode);
 
 struct nfs4_threshold *pnfs_mdsthreshold_alloc(void)
 {