libceph: start defining message data cursor
authorAlex Elder <elder@inktank.com>
Thu, 7 Mar 2013 05:39:39 +0000 (23:39 -0600)
committerSage Weil <sage@inktank.com>
Thu, 2 May 2013 04:16:56 +0000 (21:16 -0700)
This patch lays out the foundation for using generic routines to
manage processing items of message data.

For simplicity, we'll start with just the trail portion of a
message, because it stands alone and is only present for outgoing
data.

First some basic concepts.  We'll use the term "data item" to
represent one of the ceph_msg_data structures associated with a
message.  There are currently four of those, with single-letter
field names p, l, b, and t.  A data item is further broken into
"pieces" which always lie in a single page.  A data item will
include a "cursor" that will track state as the memory defined by
the item is consumed by sending data from or receiving data into it.

We define three routines to manipulate a data item's cursor: the
"init" routine; the "next" routine; and the "advance" routine.  The
"init" routine initializes the cursor so it points at the beginning
of the first piece in the item.  The "next" routine returns the
page, page offset, and length (limited by both the page and item
size) of the next unconsumed piece in the item.  It also indicates
to the caller whether the piece being returned is the last one in
the data item.

The "advance" routine consumes the requested number of bytes in the
item (advancing the cursor).  This is used to record the number of
bytes from the current piece that were actually sent or received by
the network code.  It returns an indication of whether the result
means the current piece has been fully consumed.  This is used by
the message send code to determine whether it should calculate the
CRC for the next piece processed.

The trail of a message is implemented as a ceph pagelist.  The
routines defined for it will be usable for non-trail pagelist data
as well.

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
include/linux/ceph/messenger.h
net/ceph/messenger.c

index 5860dd0c2caf75fce6e8caba1f50d04f5a1c7a4f..14862438faffacacceef32a6676ea3d7209ffa47 100644 (file)
@@ -95,6 +95,12 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
        }
 }
 
+struct ceph_msg_data_cursor {
+       bool            last_piece;     /* now at last piece of data item */
+       struct page     *page;          /* current page in pagelist */
+       size_t          offset;         /* pagelist bytes consumed */
+};
+
 struct ceph_msg_data {
        enum ceph_msg_data_type         type;
        union {
@@ -112,6 +118,7 @@ struct ceph_msg_data {
                };
                struct ceph_pagelist    *pagelist;
        };
+       struct ceph_msg_data_cursor     cursor;         /* pagelist only */
 };
 
 /*
index f256b4b174adb68a80eeb0d8825d2e9ef438ab9f..b978cf8b27fffc7fb7cbc0a25db7e223610278db 100644 (file)
@@ -21,6 +21,9 @@
 #include <linux/ceph/pagelist.h>
 #include <linux/export.h>
 
+#define list_entry_next(pos, member)                                   \
+       list_entry(pos->member.next, typeof(*pos), member)
+
 /*
  * Ceph uses the messenger to exchange ceph_msg messages with other
  * hosts in the system.  The messenger provides ordered and reliable
@@ -738,6 +741,109 @@ static void iter_bio_next(struct bio **bio_iter, unsigned int *seg)
 }
 #endif
 
+/*
+ * Message data is handled (sent or received) in pieces, where each
+ * piece resides on a single page.  The network layer might not
+ * consume an entire piece at once.  A data item's cursor keeps
+ * track of which piece is next to process and how much remains to
+ * be processed in that piece.  It also tracks whether the current
+ * piece is the last one in the data item.
+ */
+static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
+{
+       struct ceph_msg_data_cursor *cursor = &data->cursor;
+       struct ceph_pagelist *pagelist;
+       struct page *page;
+
+       if (data->type != CEPH_MSG_DATA_PAGELIST)
+               return;
+
+       pagelist = data->pagelist;
+       BUG_ON(!pagelist);
+       if (!pagelist->length)
+               return;         /* pagelist can be assigned but empty */
+
+       BUG_ON(list_empty(&pagelist->head));
+       page = list_first_entry(&pagelist->head, struct page, lru);
+
+       cursor->page = page;
+       cursor->offset = 0;
+       cursor->last_piece = pagelist->length <= PAGE_SIZE;
+}
+
+/*
+ * Return the page containing the next piece to process for a given
+ * data item, and supply the page offset and length of that piece.
+ * Indicate whether this is the last piece in this data item.
+ */
+static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
+                                               size_t *page_offset,
+                                               size_t *length,
+                                               bool *last_piece)
+{
+       struct ceph_msg_data_cursor *cursor = &data->cursor;
+       struct ceph_pagelist *pagelist;
+       size_t piece_end;
+
+       BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+       pagelist = data->pagelist;
+       BUG_ON(!pagelist);
+
+       BUG_ON(!cursor->page);
+       BUG_ON(cursor->offset >= pagelist->length);
+
+       *last_piece = cursor->last_piece;
+       if (*last_piece) {
+               /* pagelist offset is always 0 */
+               piece_end = pagelist->length & ~PAGE_MASK;
+               if (!piece_end)
+                       piece_end = PAGE_SIZE;
+       } else {
+               piece_end = PAGE_SIZE;
+       }
+       *page_offset = cursor->offset & ~PAGE_MASK;
+       *length = piece_end - *page_offset;
+
+       return data->cursor.page;
+}
+
+/*
+ * Returns true if the result moves the cursor on to the next piece
+ * (the next page) of the pagelist.
+ */
+static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
+{
+       struct ceph_msg_data_cursor *cursor = &data->cursor;
+       struct ceph_pagelist *pagelist;
+
+       BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+       pagelist = data->pagelist;
+       BUG_ON(!pagelist);
+       BUG_ON(!cursor->page);
+       BUG_ON(cursor->offset + bytes > pagelist->length);
+       BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
+
+       /* Advance the cursor offset */
+
+       cursor->offset += bytes;
+       /* pagelist offset is always 0 */
+       if (!bytes || cursor->offset & ~PAGE_MASK)
+               return false;   /* more bytes to process in the current page */
+
+       /* Move on to the next page */
+
+       BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
+       cursor->page = list_entry_next(cursor->page, lru);
+
+       /* cursor offset is at page boundary; pagelist offset is always 0 */
+       if (pagelist->length - cursor->offset <= PAGE_SIZE)
+               cursor->last_piece = true;
+
+       return true;
+}
+
 static void prepare_message_data(struct ceph_msg *msg,
                                struct ceph_msg_pos *msg_pos)
 {
@@ -755,6 +861,12 @@ static void prepare_message_data(struct ceph_msg *msg,
                init_bio_iter(msg->b.bio, &msg->b.bio_iter, &msg->b.bio_seg);
 #endif
        msg_pos->data_pos = 0;
+
+       /* If there's a trail, initialize its cursor */
+
+       if (ceph_msg_has_trail(msg))
+               ceph_msg_data_cursor_init(&msg->t);
+
        msg_pos->did_page_crc = false;
 }
 
@@ -1045,6 +1157,12 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
 
        msg_pos->data_pos += sent;
        msg_pos->page_pos += sent;
+       if (in_trail) {
+               bool need_crc;
+
+               need_crc = ceph_msg_data_advance(&msg->t, sent);
+               BUG_ON(need_crc && sent != len);
+       }
        if (sent < len)
                return;
 
@@ -1052,10 +1170,7 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
        msg_pos->page_pos = 0;
        msg_pos->page++;
        msg_pos->did_page_crc = false;
-       if (in_trail) {
-               BUG_ON(!ceph_msg_has_trail(msg));
-               list_rotate_left(&msg->t.pagelist->head);
-       } else if (ceph_msg_has_pagelist(msg)) {
+       if (ceph_msg_has_pagelist(msg)) {
                list_rotate_left(&msg->l.pagelist->head);
 #ifdef CONFIG_BLOCK
        } else if (ceph_msg_has_bio(msg)) {
@@ -1141,6 +1256,8 @@ static int write_partial_message_data(struct ceph_connection *con)
                size_t length;
                int max_write = PAGE_SIZE;
                int bio_offset = 0;
+               bool use_cursor = false;
+               bool last_piece = true; /* preserve existing behavior */
 
                in_trail = in_trail || msg_pos->data_pos >= trail_off;
                if (!in_trail)
@@ -1148,9 +1265,9 @@ static int write_partial_message_data(struct ceph_connection *con)
 
                if (in_trail) {
                        BUG_ON(!ceph_msg_has_trail(msg));
-                       total_max_write = data_len - msg_pos->data_pos;
-                       page = list_first_entry(&msg->t.pagelist->head,
-                                               struct page, lru);
+                       use_cursor = true;
+                       page = ceph_msg_data_next(&msg->t, &page_offset,
+                                                       &length, &last_piece);
                } else if (ceph_msg_has_pages(msg)) {
                        page = msg->p.pages[msg_pos->page];
                } else if (ceph_msg_has_pagelist(msg)) {
@@ -1168,8 +1285,9 @@ static int write_partial_message_data(struct ceph_connection *con)
                } else {
                        page = zero_page;
                }
-               length = min_t(int, max_write - msg_pos->page_pos,
-                           total_max_write);
+               if (!use_cursor)
+                       length = min_t(int, max_write - msg_pos->page_pos,
+                                           total_max_write);
 
                page_offset = msg_pos->page_pos + bio_offset;
                if (do_datacrc && !msg_pos->did_page_crc) {
@@ -1180,7 +1298,7 @@ static int write_partial_message_data(struct ceph_connection *con)
                        msg_pos->did_page_crc = true;
                }
                ret = ceph_tcp_sendpage(con->sock, page, page_offset,
-                                     length, true);
+                                     length, last_piece);
                if (ret <= 0)
                        goto out;