libceph: define and use in_msg_pos_next()
authorAlex Elder <elder@inktank.com>
Sat, 9 Mar 2013 00:51:04 +0000 (18:51 -0600)
committerSage Weil <sage@inktank.com>
Thu, 2 May 2013 04:16:33 +0000 (21:16 -0700)
Define a new function in_msg_pos_next() to match out_msg_pos_next(),
and use it in place of code at the end of read_partial_message_pages()
and read_partial_message_bio().

Note that the page number is incremented and offset reset under
slightly different conditions from before.  The result is
equivalent, however, as explained below.

Each time an incoming message is going to arrive, we find out how
much room is left--not surpassing the current page--and provide that
as the number of bytes to receive.  So the amount we'll use is the
lesser of:  all that's left of the entire request; and all that's
left in the current page.

If we received exactly how many were requested, we either reached
the end of the request or the end of the page.  In the first case,
we're done, in the second, we move onto the next page in the array.

In all cases but (possibly) on the last page, after adding the
number of bytes received, page_pos == PAGE_SIZE.  On the last page,
it doesn't really matter whether we increment the page number and
reset the page position, because we're done and we won't come back
here again.  The code previously skipped over that last case,
basically.  The new code handles that case the same as the others,
incrementing and resetting.

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
net/ceph/messenger.c

index 2017b8833baab9f5b02fbe918d5f5e91e693172c..fb5f6e7d57a3b0e247abca0657e26a5aa76cd0db 100644 (file)
@@ -1052,6 +1052,28 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
 #endif
 }
 
+static void in_msg_pos_next(struct ceph_connection *con, size_t len,
+                               size_t received)
+{
+       struct ceph_msg *msg = con->in_msg;
+
+       BUG_ON(!msg);
+       BUG_ON(!received);
+
+       con->in_msg_pos.data_pos += received;
+       con->in_msg_pos.page_pos += received;
+       if (received < len)
+               return;
+
+       BUG_ON(received != len);
+       con->in_msg_pos.page_pos = 0;
+       con->in_msg_pos.page++;
+#ifdef CONFIG_BLOCK
+       if (msg->bio)
+               iter_bio_next(&msg->bio_iter, &msg->bio_seg);
+#endif /* CONFIG_BLOCK */
+}
+
 /*
  * Write as much message data payload as we can.  If we finish, queue
  * up the footer.
@@ -1789,6 +1811,7 @@ static int read_partial_message_pages(struct ceph_connection *con,
                                      struct page **pages,
                                      unsigned int data_len, bool do_datacrc)
 {
+       struct page *page;
        void *p;
        int ret;
        int left;
@@ -1797,22 +1820,18 @@ static int read_partial_message_pages(struct ceph_connection *con,
                   (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
        /* (page) data */
        BUG_ON(pages == NULL);
-       p = kmap(pages[con->in_msg_pos.page]);
-       ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
-                              left);
+       page = pages[con->in_msg_pos.page];
+       p = kmap(page);
+       ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, left);
        if (ret > 0 && do_datacrc)
                con->in_data_crc =
                        crc32c(con->in_data_crc,
                                  p + con->in_msg_pos.page_pos, ret);
-       kunmap(pages[con->in_msg_pos.page]);
+       kunmap(page);
        if (ret <= 0)
                return ret;
-       con->in_msg_pos.data_pos += ret;
-       con->in_msg_pos.page_pos += ret;
-       if (con->in_msg_pos.page_pos == PAGE_SIZE) {
-               con->in_msg_pos.page_pos = 0;
-               con->in_msg_pos.page++;
-       }
+
+       in_msg_pos_next(con, left, ret);
 
        return ret;
 }
@@ -1823,32 +1842,30 @@ static int read_partial_message_bio(struct ceph_connection *con,
 {
        struct ceph_msg *msg = con->in_msg;
        struct bio_vec *bv;
+       struct page *page;
        void *p;
        int ret, left;
 
        BUG_ON(!msg);
        BUG_ON(!msg->bio_iter);
        bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
+
        left = min((int)(data_len - con->in_msg_pos.data_pos),
                   (int)(bv->bv_len - con->in_msg_pos.page_pos));
 
-       p = kmap(bv->bv_page) + bv->bv_offset;
+       page = bv->bv_page;
+       p = kmap(page) + bv->bv_offset;
 
-       ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
-                              left);
+       ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, left);
        if (ret > 0 && do_datacrc)
                con->in_data_crc =
                        crc32c(con->in_data_crc,
                                  p + con->in_msg_pos.page_pos, ret);
-       kunmap(bv->bv_page);
+       kunmap(page);
        if (ret <= 0)
                return ret;
-       con->in_msg_pos.data_pos += ret;
-       con->in_msg_pos.page_pos += ret;
-       if (con->in_msg_pos.page_pos == bv->bv_len) {
-               con->in_msg_pos.page_pos = 0;
-               iter_bio_next(&msg->bio_iter, &msg->bio_seg);
-       }
+
+       in_msg_pos_next(con, left, ret);
 
        return ret;
 }