ceph: explicitly specify page alignment in network messages
authorSage Weil <sage@newdream.net>
Tue, 9 Nov 2010 20:40:00 +0000 (12:40 -0800)
committerSage Weil <sage@newdream.net>
Tue, 9 Nov 2010 20:43:17 +0000 (12:43 -0800)
The alignment used for reading data into or out of pages used to be taken
from the data_off field in the message header.  This only worked as long
as the page alignment matched the object offset, breaking direct io to
non-page aligned offsets.

Instead, explicitly specify the page alignment next to the page vector
in the ceph_msg struct, and use that instead of the message header (which
probably shouldn't be trusted).  The alloc_msg callback is responsible for
filling in this field properly when it sets up the page vector.

Signed-off-by: Sage Weil <sage@newdream.net>
include/linux/ceph/messenger.h
net/ceph/messenger.c
net/ceph/osd_client.c

index 5956d62c3057139ac6634df8e13556659d07725e..a108b425fee25aff2cbf4dd1924c047b925f7ab0 100644 (file)
@@ -82,6 +82,7 @@ struct ceph_msg {
        struct ceph_buffer *middle;
        struct page **pages;            /* data payload.  NOT OWNER. */
        unsigned nr_pages;              /* size of page array */
+       unsigned page_alignment;        /* io offset in first page */
        struct ceph_pagelist *pagelist; /* instead of pages */
        struct list_head list_head;
        struct kref kref;
index d379abf873bcb4fb4c1b546ae8a5c44662f8e6f0..1c7a2ec4f3cc81ee608821ddf589fc0b27db4dcc 100644 (file)
@@ -540,8 +540,7 @@ static void prepare_write_message(struct ceph_connection *con)
                /* initialize page iterator */
                con->out_msg_pos.page = 0;
                if (m->pages)
-                       con->out_msg_pos.page_pos =
-                               le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+                       con->out_msg_pos.page_pos = m->page_alignment;
                else
                        con->out_msg_pos.page_pos = 0;
                con->out_msg_pos.data_pos = 0;
@@ -1491,7 +1490,7 @@ static int read_partial_message(struct ceph_connection *con)
        struct ceph_msg *m = con->in_msg;
        int ret;
        int to, left;
-       unsigned front_len, middle_len, data_len, data_off;
+       unsigned front_len, middle_len, data_len;
        int datacrc = con->msgr->nocrc;
        int skip;
        u64 seq;
@@ -1527,7 +1526,6 @@ static int read_partial_message(struct ceph_connection *con)
        data_len = le32_to_cpu(con->in_hdr.data_len);
        if (data_len > CEPH_MSG_MAX_DATA_LEN)
                return -EIO;
-       data_off = le16_to_cpu(con->in_hdr.data_off);
 
        /* verify seq# */
        seq = le64_to_cpu(con->in_hdr.seq);
@@ -1575,7 +1573,7 @@ static int read_partial_message(struct ceph_connection *con)
 
                con->in_msg_pos.page = 0;
                if (m->pages)
-                       con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+                       con->in_msg_pos.page_pos = m->page_alignment;
                else
                        con->in_msg_pos.page_pos = 0;
                con->in_msg_pos.data_pos = 0;
@@ -2300,6 +2298,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
 
        /* data */
        m->nr_pages = 0;
+       m->page_alignment = 0;
        m->pages = NULL;
        m->pagelist = NULL;
        m->bio = NULL;
@@ -2369,6 +2368,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
                               type, front_len);
                        return NULL;
                }
+               msg->page_alignment = le16_to_cpu(hdr->data_off);
        }
        memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
 
index 6c096239660cbd8e6bc1edb35902314b31540c1a..3e20a122ffa2f2bf40a91596fe12bb93e6874b64 100644 (file)
@@ -391,6 +391,8 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
                req->r_request->hdr.data_len = cpu_to_le32(data_len);
        }
 
+       req->r_request->page_alignment = req->r_page_alignment;
+
        BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
        msg_size = p - msg->front.iov_base;
        msg->front.iov_len = msg_size;
@@ -1657,6 +1659,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
                }
                m->pages = req->r_pages;
                m->nr_pages = req->r_num_pages;
+               m->page_alignment = req->r_page_alignment;
 #ifdef CONFIG_BLOCK
                m->bio = req->r_bio;
 #endif