ceph: make page alignment explicit in osd interface
authorSage Weil <sage@newdream.net>
Tue, 9 Nov 2010 20:43:12 +0000 (12:43 -0800)
committerSage Weil <sage@newdream.net>
Tue, 9 Nov 2010 20:43:12 +0000 (12:43 -0800)
We used to infer alignment of IOs within a page based on the file offset,
which assumed they matched.  This broke with direct IO that was not aligned
to pages (e.g., 512-byte aligned IO).  We were also trusting the alignment
specified in the OSD reply, which could have been adjusted by the server.

Explicitly specify the page alignment when setting up OSD IO requests.

Signed-off-by: Sage Weil <sage@newdream.net>
fs/ceph/addr.c
fs/ceph/file.c
fs/ceph/inode.c
include/linux/ceph/osd_client.h
net/ceph/osd_client.c

index 51bcc5ce323024a995d300b4a6035ecf8e72e94b..4aa8577630370e1a788c289d5cb63421ce985bc5 100644 (file)
@@ -204,7 +204,7 @@ static int readpage_nounlock(struct file *filp, struct page *page)
        err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
                                  page->index << PAGE_CACHE_SHIFT, &len,
                                  ci->i_truncate_seq, ci->i_truncate_size,
-                                 &page, 1);
+                                 &page, 1, 0);
        if (err == -ENOENT)
                err = 0;
        if (err < 0) {
@@ -287,7 +287,7 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
        rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
                                 offset, &len,
                                 ci->i_truncate_seq, ci->i_truncate_size,
-                                pages, nr_pages);
+                                pages, nr_pages, 0);
        if (rc == -ENOENT)
                rc = 0;
        if (rc < 0)
@@ -782,7 +782,7 @@ get_more_pages:
                                            snapc, do_sync,
                                            ci->i_truncate_seq,
                                            ci->i_truncate_size,
-                                           &inode->i_mtime, true, 1);
+                                           &inode->i_mtime, true, 1, 0);
                                max_pages = req->r_num_pages;
 
                                alloc_page_vec(fsc, req);
index 603fd00af0a6821e0c2c718e698def03a1accff7..8d79b8912e31e4cb8fcb4961ec1f81d63c3d5070 100644 (file)
@@ -282,11 +282,12 @@ int ceph_release(struct inode *inode, struct file *file)
 static int striped_read(struct inode *inode,
                        u64 off, u64 len,
                        struct page **pages, int num_pages,
-                       int *checkeof)
+                       int *checkeof, bool align_to_pages)
 {
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_inode_info *ci = ceph_inode(inode);
        u64 pos, this_len;
+       int io_align, page_align;
        int page_off = off & ~PAGE_CACHE_MASK; /* first byte's offset in page */
        int left, pages_left;
        int read;
@@ -302,14 +303,19 @@ static int striped_read(struct inode *inode,
        page_pos = pages;
        pages_left = num_pages;
        read = 0;
+       io_align = off & ~PAGE_MASK;
 
 more:
+       if (align_to_pages)
+               page_align = (pos - io_align) & ~PAGE_MASK;
+       else
+               page_align = pos & ~PAGE_MASK;
        this_len = left;
        ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
                                  &ci->i_layout, pos, &this_len,
                                  ci->i_truncate_seq,
                                  ci->i_truncate_size,
-                                 page_pos, pages_left);
+                                 page_pos, pages_left, page_align);
        hit_stripe = this_len < left;
        was_short = ret >= 0 && ret < this_len;
        if (ret == -ENOENT)
@@ -393,7 +399,8 @@ static ssize_t ceph_sync_read(struct file *file, char __user *data,
        if (ret < 0)
                goto done;
 
-       ret = striped_read(inode, off, len, pages, num_pages, checkeof);
+       ret = striped_read(inode, off, len, pages, num_pages, checkeof,
+                          file->f_flags & O_DIRECT);
 
        if (ret >= 0 && (file->f_flags & O_DIRECT) == 0)
                ret = ceph_copy_page_vector_to_user(pages, data, off, ret);
@@ -448,6 +455,7 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
        int flags;
        int do_sync = 0;
        int check_caps = 0;
+       int page_align, io_align;
        int ret;
        struct timespec mtime = CURRENT_TIME;
 
@@ -462,6 +470,8 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
        else
                pos = *offset;
 
+       io_align = pos & ~PAGE_MASK;
+
        ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
        if (ret < 0)
                return ret;
@@ -486,20 +496,26 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
         */
 more:
        len = left;
+       if (file->f_flags & O_DIRECT)
+               /* write from beginning of first page, regardless of
+                  io alignment */
+               page_align = (pos - io_align) & ~PAGE_MASK;
+       else
+               page_align = pos & ~PAGE_MASK;
        req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
                                    ceph_vino(inode), pos, &len,
                                    CEPH_OSD_OP_WRITE, flags,
                                    ci->i_snap_realm->cached_context,
                                    do_sync,
                                    ci->i_truncate_seq, ci->i_truncate_size,
-                                   &mtime, false, 2);
+                                   &mtime, false, 2, page_align);
        if (!req)
                return -ENOMEM;
 
        num_pages = calc_pages_for(pos, len);
 
        if (file->f_flags & O_DIRECT) {
-               pages = ceph_get_direct_page_vector(data, num_pages, pos, len);
+               pages = ceph_get_direct_page_vector(data, num_pages);
                if (IS_ERR(pages)) {
                        ret = PTR_ERR(pages);
                        goto out;
index 7bc0fbd26af21511591649742cd43665c41b92fa..8153ee5a8d74a053f8777d33867a9b929e5a3afa 100644 (file)
@@ -1752,7 +1752,7 @@ int ceph_do_getattr(struct inode *inode, int mask)
                return 0;
        }
 
-       dout("do_getattr inode %p mask %s\n", inode, ceph_cap_string(mask));
+       dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode);
        if (ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
                return 0;
 
index 6c91fb032c39cd907a3ac509db75a3a860e564c1..a1af29648fb54be5c0b5ac1cfc62879314145d56 100644 (file)
@@ -79,6 +79,7 @@ struct ceph_osd_request {
        struct ceph_file_layout r_file_layout;
        struct ceph_snap_context *r_snapc;    /* snap context for writes */
        unsigned          r_num_pages;        /* size of page array (follows) */
+       unsigned          r_page_alignment;   /* io offset in first page */
        struct page     **r_pages;            /* pages for data payload */
        int               r_pages_from_pool;
        int               r_own_pages;        /* if true, i own page list */
@@ -194,7 +195,8 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
                                      int do_sync, u32 truncate_seq,
                                      u64 truncate_size,
                                      struct timespec *mtime,
-                                     bool use_mempool, int num_reply);
+                                     bool use_mempool, int num_reply,
+                                     int page_align);
 
 static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
 {
@@ -218,7 +220,8 @@ extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                               struct ceph_file_layout *layout,
                               u64 off, u64 *plen,
                               u32 truncate_seq, u64 truncate_size,
-                              struct page **pages, int nr_pages);
+                              struct page **pages, int nr_pages,
+                              int page_align);
 
 extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
                                struct ceph_vino vino,
index 79391994b3edee7d531b823c0720262090db6618..6c096239660cbd8e6bc1edb35902314b31540c1a 100644 (file)
@@ -71,6 +71,7 @@ void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
                op->extent.length = objlen;
        }
        req->r_num_pages = calc_pages_for(off, *plen);
+       req->r_page_alignment = off & ~PAGE_MASK;
        if (op->op == CEPH_OSD_OP_WRITE)
                op->payload_len = *plen;
 
@@ -419,7 +420,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                                               u32 truncate_seq,
                                               u64 truncate_size,
                                               struct timespec *mtime,
-                                              bool use_mempool, int num_reply)
+                                              bool use_mempool, int num_reply,
+                                              int page_align)
 {
        struct ceph_osd_req_op ops[3];
        struct ceph_osd_request *req;
@@ -447,6 +449,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        calc_layout(osdc, vino, layout, off, plen, req, ops);
        req->r_file_layout = *layout;  /* keep a copy */
 
+       /* in case it differs from natural alignment that calc_layout
+          filled in for us */
+       req->r_page_alignment = page_align;
+
        ceph_osdc_build_request(req, off, plen, ops,
                                snapc,
                                mtime,
@@ -1489,7 +1495,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                        struct ceph_vino vino, struct ceph_file_layout *layout,
                        u64 off, u64 *plen,
                        u32 truncate_seq, u64 truncate_size,
-                       struct page **pages, int num_pages)
+                       struct page **pages, int num_pages, int page_align)
 {
        struct ceph_osd_request *req;
        int rc = 0;
@@ -1499,15 +1505,15 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
        req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
                                    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
                                    NULL, 0, truncate_seq, truncate_size, NULL,
-                                   false, 1);
+                                   false, 1, page_align);
        if (!req)
                return -ENOMEM;
 
        /* it may be a short read due to an object boundary */
        req->r_pages = pages;
 
-       dout("readpages  final extent is %llu~%llu (%d pages)\n",
-            off, *plen, req->r_num_pages);
+       dout("readpages  final extent is %llu~%llu (%d pages align %d)\n",
+            off, *plen, req->r_num_pages, page_align);
 
        rc = ceph_osdc_start_request(osdc, req, false);
        if (!rc)
@@ -1533,6 +1539,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
 {
        struct ceph_osd_request *req;
        int rc = 0;
+       int page_align = off & ~PAGE_MASK;
 
        BUG_ON(vino.snap != CEPH_NOSNAP);
        req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
@@ -1541,7 +1548,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                                            CEPH_OSD_FLAG_WRITE,
                                    snapc, do_sync,
                                    truncate_seq, truncate_size, mtime,
-                                   nofail, 1);
+                                   nofail, 1, page_align);
        if (!req)
                return -ENOMEM;
 
@@ -1638,8 +1645,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        m = ceph_msg_get(req->r_reply);
 
        if (data_len > 0) {
-               unsigned data_off = le16_to_cpu(hdr->data_off);
-               int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
+               int want = calc_pages_for(req->r_page_alignment, data_len);
 
                if (unlikely(req->r_num_pages < want)) {
                        pr_warning("tid %lld reply %d > expected %d pages\n",