ceph: make readpages fully async
authorSage Weil <sage@newdream.net>
Wed, 3 Aug 2011 16:58:09 +0000 (09:58 -0700)
committerSage Weil <sage@newdream.net>
Tue, 25 Oct 2011 23:10:14 +0000 (16:10 -0700)
When we get a ->readpages() aop, submit async reads for all page ranges
in the provided page list.  Lock the pages immediately, so that VFS/MM
will block until the reads complete.

Signed-off-by: Sage Weil <sage@newdream.net>
fs/ceph/addr.c

index 5a3953db81184170a8f221fc6c231996993724d2..5bb39a50f90425ca5caf4bcc0253513fc7c8f712 100644 (file)
@@ -228,102 +228,147 @@ static int ceph_readpage(struct file *filp, struct page *page)
 }
 
 /*
- * Build a vector of contiguous pages from the provided page list.
+ * Finish an async read(ahead) op.
  */
-static struct page **page_vector_from_list(struct list_head *page_list,
-                                          unsigned *nr_pages)
+static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
 {
-       struct page **pages;
-       struct page *page;
-       int next_index, contig_pages = 0;
+       struct inode *inode = req->r_inode;
+       struct ceph_osd_reply_head *replyhead;
+       int rc, bytes;
+       int i;
 
-       /* build page vector */
-       pages = kmalloc(sizeof(*pages) * *nr_pages, GFP_NOFS);
-       if (!pages)
-               return ERR_PTR(-ENOMEM);
+       /* parse reply */
+       replyhead = msg->front.iov_base;
+       WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
+       rc = le32_to_cpu(replyhead->result);
+       bytes = le32_to_cpu(msg->hdr.data_len);
 
-       BUG_ON(list_empty(page_list));
-       next_index = list_entry(page_list->prev, struct page, lru)->index;
-       list_for_each_entry_reverse(page, page_list, lru) {
-               if (page->index == next_index) {
-                       dout("readpages page %d %p\n", contig_pages, page);
-                       pages[contig_pages] = page;
-                       contig_pages++;
-                       next_index++;
-               } else {
-                       break;
+       dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
+
+       /* unlock all pages, zeroing any data we didn't read */
+       for (i = 0; i < req->r_num_pages; i++, bytes -= PAGE_CACHE_SIZE) {
+               struct page *page = req->r_pages[i];
+
+               if (bytes < (int)PAGE_CACHE_SIZE) {
+                       /* zero (remainder of) page */
+                       int s = bytes < 0 ? 0 : bytes;
+                       zero_user_segment(page, s, PAGE_CACHE_SIZE);
                }
+               dout("finish_read %p uptodate %p idx %lu\n", inode, page,
+                    page->index);
+               flush_dcache_page(page);
+               SetPageUptodate(page);
+               unlock_page(page);
+               page_cache_release(page);
        }
-       *nr_pages = contig_pages;
-       return pages;
+       kfree(req->r_pages);
 }
 
 /*
- * Read multiple pages.  Leave pages we don't read + unlock in page_list;
- * the caller (VM) cleans them up.
+ * start an async read(ahead) operation.  return nr_pages we submitted
+ * a read for on success, or negative error code.
  */
-static int ceph_readpages(struct file *file, struct address_space *mapping,
-                         struct list_head *page_list, unsigned nr_pages)
+static int start_read(struct inode *inode, struct list_head *page_list)
 {
-       struct inode *inode = file->f_dentry->d_inode;
-       struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_osd_client *osdc =
                &ceph_inode_to_client(inode)->client->osdc;
-       int rc = 0;
-       struct page **pages;
-       loff_t offset;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct page *page = list_entry(page_list->prev, struct page, lru);
+       struct ceph_osd_request *req;
+       u64 off;
        u64 len;
+       int i;
+       struct page **pages;
+       pgoff_t next_index;
+       int nr_pages = 0;
+       int ret;
 
-       dout("readpages %p file %p nr_pages %d\n",
-            inode, file, nr_pages);
-
-       pages = page_vector_from_list(page_list, &nr_pages);
-       if (IS_ERR(pages))
-               return PTR_ERR(pages);
+       off = page->index << PAGE_CACHE_SHIFT;
 
-       /* guess read extent */
-       offset = pages[0]->index << PAGE_CACHE_SHIFT;
+       /* count pages */
+       next_index = page->index;
+       list_for_each_entry_reverse(page, page_list, lru) {
+               if (page->index != next_index)
+                       break;
+               nr_pages++;
+               next_index++;
+       }
        len = nr_pages << PAGE_CACHE_SHIFT;
-       rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
-                                offset, &len,
-                                ci->i_truncate_seq, ci->i_truncate_size,
-                                pages, nr_pages, 0);
-       if (rc == -ENOENT)
-               rc = 0;
-       if (rc < 0)
-               goto out;
-
-       for (; !list_empty(page_list) && len > 0;
-            rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) {
-               struct page *page =
-                       list_entry(page_list->prev, struct page, lru);
+       dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
+            off, len);
+
+       req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode),
+                                   off, &len,
+                                   CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
+                                   NULL, 0,
+                                   ci->i_truncate_seq, ci->i_truncate_size,
+                                   NULL, false, 1, 0);
+       if (!req)
+               return -ENOMEM;
 
+       /* build page vector */
+       nr_pages = len >> PAGE_CACHE_SHIFT;
+       pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
+       ret = -ENOMEM;
+       if (!pages)
+               goto out;
+       for (i = 0; i < nr_pages; ++i) {
+               page = list_entry(page_list->prev, struct page, lru);
+               BUG_ON(PageLocked(page));
                list_del(&page->lru);
-
-               if (rc < (int)PAGE_CACHE_SIZE) {
-                       /* zero (remainder of) page */
-                       int s = rc < 0 ? 0 : rc;
-                       zero_user_segment(page, s, PAGE_CACHE_SIZE);
-               }
-
-               if (add_to_page_cache_lru(page, mapping, page->index,
+               
+               dout("start_read %p adding %p idx %lu\n", inode, page,
+                    page->index);
+               if (add_to_page_cache_lru(page, &inode->i_data, page->index,
                                          GFP_NOFS)) {
                        page_cache_release(page);
-                       dout("readpages %p add_to_page_cache failed %p\n",
+                       dout("start_read %p add_to_page_cache failed %p\n",
                             inode, page);
-                       continue;
+                       nr_pages = i;
+                       goto out_pages;
                }
-               dout("readpages %p adding %p idx %lu\n", inode, page,
-                    page->index);
-               flush_dcache_page(page);
-               SetPageUptodate(page);
-               unlock_page(page);
-               page_cache_release(page);
+               pages[i] = page;
        }
-       rc = 0;
+       req->r_pages = pages;
+       req->r_num_pages = nr_pages;
+       req->r_callback = finish_read;
+       req->r_inode = inode;
+
+       dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
+       ret = ceph_osdc_start_request(osdc, req, false);
+       if (ret < 0)
+               goto out_pages;
+       ceph_osdc_put_request(req);
+       return nr_pages;
 
-out:
+out_pages:
+       ceph_release_page_vector(pages, nr_pages);
        kfree(pages);
+out:
+       ceph_osdc_put_request(req);
+       return ret;
+}
+
+
+/*
+ * Read multiple pages.  Leave pages we don't read + unlock in page_list;
+ * the caller (VM) cleans them up.
+ */
+static int ceph_readpages(struct file *file, struct address_space *mapping,
+                         struct list_head *page_list, unsigned nr_pages)
+{
+       struct inode *inode = file->f_dentry->d_inode;
+       int rc = 0;
+
+       dout("readpages %p file %p nr_pages %d\n", inode, file, nr_pages);
+       while (!list_empty(page_list)) {
+               rc = start_read(inode, page_list);
+               if (rc < 0)
+                       goto out;
+               BUG_ON(rc == 0);
+       }
+out:
+       dout("readpages %p file %p ret %d\n", inode, file, rc);
        return rc;
 }