ceph: use single osd op reply msg
authorSage Weil <sage@newdream.net>
Mon, 1 Mar 2010 21:02:00 +0000 (13:02 -0800)
committerSage Weil <sage@newdream.net>
Mon, 1 Mar 2010 23:20:02 +0000 (15:20 -0800)
Use a single ceph_msg for the osd reply, even when we are getting multiple
replies.

Signed-off-by: Sage Weil <sage@newdream.net>
fs/ceph/osd_client.c
fs/ceph/osd_client.h

index 3a631f27cc9e32752e5850be3792aee054fa503b..ffe1f4064ccd1965cce222c4ef7814f1ae6b2725 100644 (file)
@@ -13,7 +13,8 @@
 #include "decode.h"
 #include "auth.h"
 
-#define OSD_REPLY_RESERVE_FRONT_LEN    512
+#define OSD_OP_FRONT_LEN       4096
+#define OSD_OPREPLY_FRONT_LEN  512
 
 const static struct ceph_connection_operations osd_con_ops;
 
@@ -75,17 +76,6 @@ static void calc_layout(struct ceph_osd_client *osdc,
             req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
 }
 
-static void remove_replies(struct ceph_osd_request *req)
-{
-       int i;
-       int max = ARRAY_SIZE(req->replies);
-
-       for (i=0; i<max; i++) {
-               if (req->replies[i])
-                       ceph_msg_put(req->replies[i]);
-       }
-}
-
 /*
  * requests
  */
@@ -99,7 +89,6 @@ void ceph_osdc_release_request(struct kref *kref)
                ceph_msg_put(req->r_request);
        if (req->r_reply)
                ceph_msg_put(req->r_reply);
-       remove_replies(req);
        if (req->r_con_filling_msg) {
                dout("release_request revoking pages %p from con %p\n",
                     req->r_pages, req->r_con_filling_msg);
@@ -117,60 +106,6 @@ void ceph_osdc_release_request(struct kref *kref)
                kfree(req);
 }
 
-static int alloc_replies(struct ceph_osd_request *req, int num_reply)
-{
-       int i;
-       int max = ARRAY_SIZE(req->replies);
-
-       BUG_ON(num_reply > max);
-
-       for (i=0; i<num_reply; i++) {
-               req->replies[i] = ceph_msg_new(0, OSD_REPLY_RESERVE_FRONT_LEN, 0, 0, NULL);
-               if (IS_ERR(req->replies[i])) {
-                       int j;
-                       int err = PTR_ERR(req->replies[i]);
-                       for (j = 0; j<=i; j++) {
-                               ceph_msg_put(req->replies[j]);
-                       }
-                       return err;
-               }
-       }
-
-       for (; i<max; i++) {
-               req->replies[i] = NULL;
-       }
-
-       req->cur_reply = 0;
-
-       return 0;
-}
-
-static struct ceph_msg *__get_next_reply(struct ceph_connection *con,
-                                      struct ceph_osd_request *req,
-                                      int front_len)
-{
-       struct ceph_msg *reply;
-       if (req->r_con_filling_msg) {
-               dout("revoking reply msg %p from old con %p\n", req->r_reply,
-                    req->r_con_filling_msg);
-               ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
-               ceph_con_put(req->r_con_filling_msg);
-               req->cur_reply = 0;
-       }
-       reply = req->replies[req->cur_reply];
-       if (!reply || front_len > OSD_REPLY_RESERVE_FRONT_LEN) {
-               /* maybe we can allocate it now? */
-               reply = ceph_msg_new(0, front_len, 0, 0, NULL);
-               if (!reply || IS_ERR(reply)) {
-                       pr_err(" reply alloc failed, front_len=%d\n", front_len);
-                       return ERR_PTR(-ENOMEM);
-               }
-       }
-       req->r_con_filling_msg = ceph_con_get(con);
-       req->r_reply = ceph_msg_get(reply); /* for duration of read over socket */
-       return ceph_msg_get(reply);
-}
-
 /*
  * build new request AND message, calculate layout, and adjust file
  * extent as needed.
@@ -201,7 +136,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        void *p;
        int num_op = 1 + do_sync;
        size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
-       int err, i;
+       int i;
 
        if (use_mempool) {
                req = mempool_alloc(osdc->req_mempool, GFP_NOFS);
@@ -212,13 +147,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        if (req == NULL)
                return ERR_PTR(-ENOMEM);
 
-       err = alloc_replies(req, num_reply);
-       if (err) {
-               ceph_osdc_put_request(req);
-               return ERR_PTR(-ENOMEM);
-       }
-       req->r_num_prealloc_reply = num_reply;
-
        req->r_osdc = osdc;
        req->r_mempool = use_mempool;
        kref_init(&req->r_kref);
@@ -229,7 +157,19 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 
        WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
 
-       /* create message; allow space for oid */
+       /* create reply message */
+       if (use_mempool)
+               msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
+       else
+               msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
+                                  OSD_OPREPLY_FRONT_LEN, 0, 0, NULL);
+       if (IS_ERR(msg)) {
+               ceph_osdc_put_request(req);
+               return ERR_PTR(PTR_ERR(msg));
+       }
+       req->r_reply = msg;
+
+       /* create request message; allow space for oid */
        msg_size += 40;
        if (snapc)
                msg_size += sizeof(u64) * snapc->num_snaps;
@@ -819,21 +759,11 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
         * avoid a (safe but slower) revoke later.
         */
        if (req->r_con_filling_msg == con && req->r_reply == msg) {
-               dout(" got pages, dropping con_filling_msg ref %p\n", con);
+               dout(" dropping con_filling_msg ref %p\n", con);
                req->r_con_filling_msg = NULL;
                ceph_con_put(con);
        }
 
-       if (req->r_reply) {
-               /*
-                * once we see the message has been received, we don't
-                * need a ref (which is only needed for revoking
-                * pages)
-                */
-               ceph_msg_put(req->r_reply);
-               req->r_reply = NULL;
-       }
-
        if (!req->r_got_reply) {
                unsigned bytes;
 
@@ -1249,11 +1179,17 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        if (!osdc->req_mempool)
                goto out;
 
-       err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true);
+       err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true);
        if (err < 0)
                goto out_mempool;
+       err = ceph_msgpool_init(&osdc->msgpool_op_reply,
+                               OSD_OPREPLY_FRONT_LEN, 10, true);
+       if (err < 0)
+               goto out_msgpool;
        return 0;
 
+out_msgpool:
+       ceph_msgpool_destroy(&osdc->msgpool_op);
 out_mempool:
        mempool_destroy(osdc->req_mempool);
 out:
@@ -1271,6 +1207,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
        remove_old_osds(osdc, 1);
        mempool_destroy(osdc->req_mempool);
        ceph_msgpool_destroy(&osdc->msgpool_op);
+       ceph_msgpool_destroy(&osdc->msgpool_op_reply);
 }
 
 /*
@@ -1405,16 +1342,29 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        if (!req) {
                *skip = 1;
                m = NULL;
-               pr_info("alloc_msg unknown tid %llu from osd%d\n", tid,
+               pr_info("get_reply unknown tid %llu from osd%d\n", tid,
                        osd->o_osd);
                goto out;
        }
-       m = __get_next_reply(con, req, front);
-       if (!m || IS_ERR(m)) {
-               *skip = 1;
-               goto out;
+
+       if (req->r_con_filling_msg) {
+               dout("get_reply revoking msg %p from old con %p\n",
+                    req->r_reply, req->r_con_filling_msg);
+               ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
+               ceph_con_put(req->r_con_filling_msg);
        }
 
+       if (front > req->r_reply->front.iov_len) {
+               pr_warning("get_reply front %d > preallocated %d\n",
+                          front, (int)req->r_reply->front.iov_len);
+               m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, 0, 0, NULL);
+               if (IS_ERR(m))
+                       goto out;
+               ceph_msg_put(req->r_reply);
+               req->r_reply = m;
+       }
+       m = ceph_msg_get(req->r_reply);
+
        if (data_len > 0) {
                err = __prepare_pages(con, hdr, req, tid, m);
                if (err < 0) {
@@ -1424,6 +1374,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
                }
        }
        *skip = 0;
+       req->r_con_filling_msg = ceph_con_get(con);
+       dout("get_reply tid %lld %p\n", tid, m);
 
 out:
        mutex_unlock(&osdc->request_mutex);
index 70f31b61f02cd1445816e2a981393e8ad7e532ac..f256eba6fe7aa76da74a12cae48ba0b69ca66a61 100644 (file)
@@ -53,7 +53,6 @@ struct ceph_osd_request {
        int               r_flags;     /* any additional flags for the osd */
        u32               r_sent;      /* >0 if r_request is sending/sent */
        int               r_got_reply;
-       int               r_num_prealloc_reply;
 
        struct ceph_osd_client *r_osdc;
        struct kref       r_kref;
@@ -77,9 +76,6 @@ struct ceph_osd_request {
        struct page     **r_pages;            /* pages for data payload */
        int               r_pages_from_pool;
        int               r_own_pages;        /* if true, i own page list */
-
-       struct ceph_msg   *replies[2];
-       int               cur_reply;
 };
 
 struct ceph_osd_client {
@@ -106,6 +102,7 @@ struct ceph_osd_client {
        mempool_t              *req_mempool;
 
        struct ceph_msgpool     msgpool_op;
+       struct ceph_msgpool     msgpool_op_reply;
 };
 
 extern int ceph_osdc_init(struct ceph_osd_client *osdc,