pNFS: Tighten up locking around DS commit buckets
authorTrond Myklebust <trond.myklebust@primarydata.com>
Mon, 3 Aug 2015 21:38:33 +0000 (17:38 -0400)
committerTrond Myklebust <trond.myklebust@primarydata.com>
Wed, 12 Aug 2015 18:56:18 +0000 (14:56 -0400)
I'm not aware of any bugreports around this issue, but the locking
around the pnfs_commit_bucket is inconsistent at best. This patch
tightens it up by ensuring that the 'bucket->committing' list is always
changed atomically w.r.t. the 'bucket->clseg' layout segment tracking.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
fs/nfs/pnfs_nfs.c

index 7a282876662f07519dd1424a7092507f666b2711..cd3c0403101bbf0975eabaa89c0c9ab01567aeb7 100644 (file)
@@ -124,11 +124,12 @@ pnfs_generic_scan_ds_commit_list(struct pnfs_commit_bucket *bucket,
        if (ret) {
                cinfo->ds->nwritten -= ret;
                cinfo->ds->ncommitting += ret;
-               bucket->clseg = bucket->wlseg;
-               if (list_empty(src))
+               if (bucket->clseg == NULL)
+                       bucket->clseg = pnfs_get_lseg(bucket->wlseg);
+               if (list_empty(src)) {
+                       pnfs_put_lseg_locked(bucket->wlseg);
                        bucket->wlseg = NULL;
-               else
-                       pnfs_get_lseg(bucket->clseg);
+               }
        }
        return ret;
 }
@@ -182,19 +183,23 @@ static void pnfs_generic_retry_commit(struct nfs_commit_info *cinfo, int idx)
        struct pnfs_ds_commit_info *fl_cinfo = cinfo->ds;
        struct pnfs_commit_bucket *bucket;
        struct pnfs_layout_segment *freeme;
+       LIST_HEAD(pages);
        int i;
 
+       spin_lock(cinfo->lock);
        for (i = idx; i < fl_cinfo->nbuckets; i++) {
                bucket = &fl_cinfo->buckets[i];
                if (list_empty(&bucket->committing))
                        continue;
-               nfs_retry_commit(&bucket->committing, bucket->clseg, cinfo, i);
-               spin_lock(cinfo->lock);
                freeme = bucket->clseg;
                bucket->clseg = NULL;
+               list_splice_init(&bucket->committing, &pages);
                spin_unlock(cinfo->lock);
+               nfs_retry_commit(&pages, freeme, cinfo, i);
                pnfs_put_lseg(freeme);
+               spin_lock(cinfo->lock);
        }
+       spin_unlock(cinfo->lock);
 }
 
 static unsigned int
@@ -216,10 +221,6 @@ pnfs_generic_alloc_ds_commits(struct nfs_commit_info *cinfo,
                if (!data)
                        break;
                data->ds_commit_index = i;
-               spin_lock(cinfo->lock);
-               data->lseg = bucket->clseg;
-               bucket->clseg = NULL;
-               spin_unlock(cinfo->lock);
                list_add(&data->pages, list);
                nreq++;
        }
@@ -229,6 +230,22 @@ pnfs_generic_alloc_ds_commits(struct nfs_commit_info *cinfo,
        return nreq;
 }
 
+static inline
+void pnfs_fetch_commit_bucket_list(struct list_head *pages,
+               struct nfs_commit_data *data,
+               struct nfs_commit_info *cinfo)
+{
+       struct pnfs_commit_bucket *bucket;
+
+       bucket = &cinfo->ds->buckets[data->ds_commit_index];
+       spin_lock(cinfo->lock);
+       list_splice_init(pages, &bucket->committing);
+       data->lseg = bucket->clseg;
+       bucket->clseg = NULL;
+       spin_unlock(cinfo->lock);
+
+}
+
 /* This follows nfs_commit_list pretty closely */
 int
 pnfs_generic_commit_pagelist(struct inode *inode, struct list_head *mds_pages,
@@ -243,7 +260,7 @@ pnfs_generic_commit_pagelist(struct inode *inode, struct list_head *mds_pages,
        if (!list_empty(mds_pages)) {
                data = nfs_commitdata_alloc();
                if (data != NULL) {
-                       data->lseg = NULL;
+                       data->ds_commit_index = -1;
                        list_add(&data->pages, &list);
                        nreq++;
                } else {
@@ -265,19 +282,16 @@ pnfs_generic_commit_pagelist(struct inode *inode, struct list_head *mds_pages,
 
        list_for_each_entry_safe(data, tmp, &list, pages) {
                list_del_init(&data->pages);
-               if (!data->lseg) {
+               if (data->ds_commit_index < 0) {
                        nfs_init_commit(data, mds_pages, NULL, cinfo);
                        nfs_initiate_commit(NFS_CLIENT(inode), data,
                                            NFS_PROTO(data->inode),
                                            data->mds_ops, how, 0);
                } else {
-                       struct pnfs_commit_bucket *buckets;
+                       LIST_HEAD(pages);
 
-                       buckets = cinfo->ds->buckets;
-                       nfs_init_commit(data,
-                                       &buckets[data->ds_commit_index].committing,
-                                       data->lseg,
-                                       cinfo);
+                       pnfs_fetch_commit_bucket_list(&pages, data, cinfo);
+                       nfs_init_commit(data, &pages, data->lseg, cinfo);
                        initiate_commit(data, how);
                }
        }