libceph: abstract message data
authorAlex Elder <elder@inktank.com>
Sat, 2 Mar 2013 00:00:16 +0000 (18:00 -0600)
committerSage Weil <sage@inktank.com>
Thu, 2 May 2013 04:16:55 +0000 (21:16 -0700)
Group the types of message data into an abstract structure with a
type indicator and a union containing fields appropriate to the
type of data it represents.  Use this to represent the pages,
pagelist, bio, and trail in a ceph message.

Verify message data is of type NONE in ceph_msg_data_set_*()
routines.  Since information about message data of type NONE really
should not be interpreted, get rid of the other assertions in those
functions.

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
include/linux/ceph/messenger.h
net/ceph/messenger.c

index fb2b18a20c13995c3f17cedf543e0e1e8fb7d934..5860dd0c2caf75fce6e8caba1f50d04f5a1c7a4f 100644 (file)
@@ -64,12 +64,55 @@ struct ceph_messenger {
        u32 required_features;
 };
 
-#define ceph_msg_has_pages(m)          ((m)->p.pages != NULL)
-#define ceph_msg_has_pagelist(m)       ((m)->l.pagelist != NULL)
+#define ceph_msg_has_pages(m)          ((m)->p.type == CEPH_MSG_DATA_PAGES)
+#define ceph_msg_has_pagelist(m)       ((m)->l.type == CEPH_MSG_DATA_PAGELIST)
 #ifdef CONFIG_BLOCK
-#define ceph_msg_has_bio(m)            ((m)->b.bio != NULL)
+#define ceph_msg_has_bio(m)            ((m)->b.type == CEPH_MSG_DATA_BIO)
 #endif /* CONFIG_BLOCK */
-#define ceph_msg_has_trail(m)          ((m)->t.trail != NULL)
+#define ceph_msg_has_trail(m)          ((m)->t.type == CEPH_MSG_DATA_PAGELIST)
+
+enum ceph_msg_data_type {
+       CEPH_MSG_DATA_NONE,     /* message contains no data payload */
+       CEPH_MSG_DATA_PAGES,    /* data source/destination is a page array */
+       CEPH_MSG_DATA_PAGELIST, /* data source/destination is a pagelist */
+#ifdef CONFIG_BLOCK
+       CEPH_MSG_DATA_BIO,      /* data source/destination is a bio list */
+#endif /* CONFIG_BLOCK */
+};
+
+static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
+{
+       switch (type) {
+       case CEPH_MSG_DATA_NONE:
+       case CEPH_MSG_DATA_PAGES:
+       case CEPH_MSG_DATA_PAGELIST:
+#ifdef CONFIG_BLOCK
+       case CEPH_MSG_DATA_BIO:
+#endif /* CONFIG_BLOCK */
+               return true;
+       default:
+               return false;
+       }
+}
+
+struct ceph_msg_data {
+       enum ceph_msg_data_type         type;
+       union {
+#ifdef CONFIG_BLOCK
+               struct {
+                       struct bio      *bio_iter;      /* iterator */
+                       struct bio      *bio;
+                       unsigned int    bio_seg;        /* current seg in bio */
+               };
+#endif /* CONFIG_BLOCK */
+               struct {
+                       struct page     **pages;        /* NOT OWNER. */
+                       size_t          length;         /* total # bytes */
+                       unsigned int    alignment;      /* first page */
+               };
+               struct ceph_pagelist    *pagelist;
+       };
+};
 
 /*
  * a single message.  it contains a header (src, dest, message type, etc.),
@@ -83,24 +126,12 @@ struct ceph_msg {
        struct ceph_buffer *middle;
 
        /* data payload */
-       struct {
-               struct page     **pages;        /* NOT OWNER. */
-               size_t          length;         /* # data bytes in array */
-               unsigned int    alignment;      /* first page */
-       } p;
-       struct {
-               struct ceph_pagelist    *pagelist;
-       } l;
+       struct ceph_msg_data    p;      /* pages */
+       struct ceph_msg_data    l;      /* pagelist */
 #ifdef CONFIG_BLOCK
-       struct {
-               struct bio      *bio_iter;      /* iterator */
-               struct bio      *bio;
-               unsigned int    bio_seg;        /* current seg in bio */
-       } b;
+       struct ceph_msg_data    b;      /* bio */
 #endif /* CONFIG_BLOCK */
-       struct {
-               struct ceph_pagelist *trail;    /* trailing part of data */
-       } t;
+       struct ceph_msg_data    t;      /* trail */
 
        struct ceph_connection *con;
        struct list_head list_head;     /* links for connection lists */
index f485455f05a81cde733ba2f37ef6f60a89bee3c2..f256b4b174adb68a80eeb0d8825d2e9ef438ab9f 100644 (file)
@@ -1054,7 +1054,7 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
        msg_pos->did_page_crc = false;
        if (in_trail) {
                BUG_ON(!ceph_msg_has_trail(msg));
-               list_rotate_left(&msg->t.trail->head);
+               list_rotate_left(&msg->t.pagelist->head);
        } else if (ceph_msg_has_pagelist(msg)) {
                list_rotate_left(&msg->l.pagelist->head);
 #ifdef CONFIG_BLOCK
@@ -1120,7 +1120,7 @@ static int write_partial_message_data(struct ceph_connection *con)
        size_t trail_off = data_len;
 
        if (ceph_msg_has_trail(msg)) {
-               trail_len = msg->t.trail->length;
+               trail_len = msg->t.pagelist->length;
                trail_off -= trail_len;
        }
 
@@ -1149,7 +1149,7 @@ static int write_partial_message_data(struct ceph_connection *con)
                if (in_trail) {
                        BUG_ON(!ceph_msg_has_trail(msg));
                        total_max_write = data_len - msg_pos->data_pos;
-                       page = list_first_entry(&msg->t.trail->head,
+                       page = list_first_entry(&msg->t.pagelist->head,
                                                struct page, lru);
                } else if (ceph_msg_has_pages(msg)) {
                        page = msg->p.pages[msg_pos->page];
@@ -2736,14 +2736,19 @@ void ceph_con_keepalive(struct ceph_connection *con)
 }
 EXPORT_SYMBOL(ceph_con_keepalive);
 
+static void ceph_msg_data_init(struct ceph_msg_data *data)
+{
+       data->type = CEPH_MSG_DATA_NONE;
+}
+
 void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
                size_t length, size_t alignment)
 {
        BUG_ON(!pages);
        BUG_ON(!length);
-       BUG_ON(msg->p.pages);
-       BUG_ON(msg->p.length);
+       BUG_ON(msg->p.type != CEPH_MSG_DATA_NONE);
 
+       msg->p.type = CEPH_MSG_DATA_PAGES;
        msg->p.pages = pages;
        msg->p.length = length;
        msg->p.alignment = alignment & ~PAGE_MASK;
@@ -2755,8 +2760,9 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
 {
        BUG_ON(!pagelist);
        BUG_ON(!pagelist->length);
-       BUG_ON(msg->l.pagelist);
+       BUG_ON(msg->l.type != CEPH_MSG_DATA_NONE);
 
+       msg->l.type = CEPH_MSG_DATA_PAGELIST;
        msg->l.pagelist = pagelist;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
@@ -2764,8 +2770,9 @@ EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
 void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
 {
        BUG_ON(!bio);
-       BUG_ON(msg->b.bio);
+       BUG_ON(msg->b.type != CEPH_MSG_DATA_NONE);
 
+       msg->b.type = CEPH_MSG_DATA_BIO;
        msg->b.bio = bio;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_bio);
@@ -2774,9 +2781,10 @@ void ceph_msg_data_set_trail(struct ceph_msg *msg, struct ceph_pagelist *trail)
 {
        BUG_ON(!trail);
        BUG_ON(!trail->length);
-       BUG_ON(msg->t.trail);
+       BUG_ON(msg->b.type != CEPH_MSG_DATA_NONE);
 
-       msg->t.trail = trail;
+       msg->t.type = CEPH_MSG_DATA_PAGELIST;
+       msg->t.pagelist = trail;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_trail);
 
@@ -2800,6 +2808,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
        INIT_LIST_HEAD(&m->list_head);
        kref_init(&m->kref);
 
+       ceph_msg_data_init(&m->p);
+       ceph_msg_data_init(&m->l);
+       ceph_msg_data_init(&m->b);
+       ceph_msg_data_init(&m->t);
+
        /* front */
        m->front_max = front_len;
        if (front_len) {
@@ -2965,7 +2978,7 @@ void ceph_msg_last_put(struct kref *kref)
        }
 
        if (ceph_msg_has_trail(m))
-               m->t.trail = NULL;
+               m->t.pagelist = NULL;
 
        if (m->pool)
                ceph_msgpool_put(m->pool, m);