libceph: isolate message page field manipulation
authorAlex Elder <elder@inktank.com>
Thu, 14 Feb 2013 18:16:43 +0000 (12:16 -0600)
committerSage Weil <sage@inktank.com>
Thu, 2 May 2013 04:16:38 +0000 (21:16 -0700)
Define a function ceph_msg_data_set_pages(), which more clearly
abstracts the assignment page-related fields for data in a ceph
message structure.  Use this new function in the osd client and mds
client.

Ideally, these fields would never be set more than once (with
BUG_ON() calls to guarantee that).  At the moment though the osd
client sets these every time it receives a message, and in the event
of a communication problem this can happen more than once.  (This
will be resolved shortly, but setting up these helpers first makes
it all a bit easier to work with.)

Rearrange the field order in a ceph_msg structure to group those
that are used to define the possible data payloads.

This partially resolves:
    http://tracker.ceph.com/issues/4263

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
fs/ceph/mds_client.c
include/linux/ceph/messenger.h
net/ceph/messenger.c
net/ceph/osd_client.c

index ecfb738bca30a1886a8d3f0da4accfc3d79564bb..90198a4070231d0f79bde603945b25a154a61ee4 100644 (file)
@@ -1721,8 +1721,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
        msg->front.iov_len = p - msg->front.iov_base;
        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 
-       msg->pages = req->r_pages;
-       msg->page_count = req->r_num_pages;
+       ceph_msg_data_set_pages(msg, req->r_pages, req->r_num_pages, 0);
+
        msg->hdr.data_len = cpu_to_le32(req->r_data_len);
        msg->hdr.data_off = cpu_to_le16(0);
 
index 6c118748a7f8f280cb4febe4565bf30280fb91c7..aa463b9b30af9acebaa8f85b434b606b1912f6a9 100644 (file)
@@ -74,21 +74,22 @@ struct ceph_msg {
        struct ceph_msg_footer footer;  /* footer */
        struct kvec front;              /* unaligned blobs of message */
        struct ceph_buffer *middle;
-       struct page **pages;            /* data payload.  NOT OWNER. */
-       unsigned page_count;            /* size of page array */
-       unsigned page_alignment;        /* io offset in first page */
-       struct ceph_pagelist *pagelist; /* instead of pages */
-
-       struct ceph_connection *con;
-       struct list_head list_head;
 
-       struct kref kref;
+       struct page **pages;            /* data payload.  NOT OWNER. */
+       unsigned int page_alignment;    /* io offset in first page */
+       unsigned int page_count;        /* # pages in array or list */
+       struct ceph_pagelist *pagelist; /* instead of pages */
 #ifdef CONFIG_BLOCK
+       unsigned int bio_seg;           /* current bio segment */
        struct bio  *bio;               /* instead of pages/pagelist */
        struct bio  *bio_iter;          /* bio iterator */
-       unsigned int bio_seg;           /* current bio segment */
 #endif /* CONFIG_BLOCK */
        struct ceph_pagelist *trail;    /* the trailing part of the data */
+
+       struct ceph_connection *con;
+       struct list_head list_head;     /* links for connection lists */
+
+       struct kref kref;
        bool front_is_vmalloc;
        bool more_to_follow;
        bool needs_out_seq;
@@ -218,6 +219,9 @@ extern void ceph_msg_revoke_incoming(struct ceph_msg *msg);
 
 extern void ceph_con_keepalive(struct ceph_connection *con);
 
+extern void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
+                               unsigned int page_count, size_t alignment);
+
 extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
                                     bool can_fail);
 extern void ceph_msg_kfree(struct ceph_msg *m);
index ce1669f75ca5eb0c1be9245b9cf5088d1664ed08..cec39cb623f0f609b3d53d9577008ad3ca6e3c62 100644 (file)
@@ -2689,6 +2689,17 @@ void ceph_con_keepalive(struct ceph_connection *con)
 }
 EXPORT_SYMBOL(ceph_con_keepalive);
 
+void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
+               unsigned int page_count, size_t alignment)
+{
+       /* BUG_ON(msg->pages); */
+       /* BUG_ON(msg->page_count); */
+
+       msg->pages = pages;
+       msg->page_count = page_count;
+       msg->page_alignment = alignment & ~PAGE_MASK;
+}
+EXPORT_SYMBOL(ceph_msg_data_set_pages);
 
 /*
  * construct a new message with given type, size
index 202af14dc6dcaa21fbf6a62b6704e054d627986b..a09d5713407523cbc2fd1238dd249aa53b57f18b 100644 (file)
@@ -1760,11 +1760,10 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
        if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
                unsigned int page_count;
 
-               req->r_request->pages = osd_data->pages;
                page_count = calc_pages_for((u64)osd_data->alignment,
                                                (u64)osd_data->length);
-               req->r_request->page_count = page_count;
-               req->r_request->page_alignment = osd_data->alignment;
+               ceph_msg_data_set_pages(req->r_request, osd_data->pages,
+                       page_count, osd_data->alignment);
 #ifdef CONFIG_BLOCK
        } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
                req->r_request->bio = osd_data->bio;
@@ -2135,9 +2134,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
                        }
                        page_count = calc_pages_for((u64)osd_data->alignment,
                                                        (u64)osd_data->length);
-                       m->pages = osd_data->pages;
-                       m->page_count = page_count;
-                       m->page_alignment = osd_data->alignment;
+                       ceph_msg_data_set_pages(m, osd_data->pages,
+                               osd_data->num_pages, osd_data->alignment);
 #ifdef CONFIG_BLOCK
                } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
                        m->bio = osd_data->bio;