Merge branch 'testing' of github.com:ceph/ceph-client into v3.8-rc5-testing
authorAlex Elder <elder@inktank.com>
Wed, 30 Jan 2013 13:54:34 +0000 (07:54 -0600)
committerAlex Elder <elder@inktank.com>
Wed, 30 Jan 2013 13:54:34 +0000 (07:54 -0600)
14 files changed:
drivers/block/rbd.c
fs/ceph/caps.c
fs/ceph/file.c
fs/ceph/ioctl.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
include/linux/ceph/ceph_features.h
include/linux/ceph/decode.h
include/linux/ceph/osd_client.h
include/linux/ceph/osdmap.h
include/linux/crush/crush.h
net/ceph/crush/mapper.c
net/ceph/osd_client.c
net/ceph/osdmap.c

index 89576a0b3f2ed5b099ecf7ce675104d282b0dd48..668936381ab0a59f81f6658705ef5e2ced80f441 100644 (file)
 #define        SECTOR_SHIFT    9
 #define        SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
 
-/* It might be useful to have this defined elsewhere too */
+/* It might be useful to have these defined elsewhere */
 
-#define        U64_MAX ((u64) (~0ULL))
+#define        U8_MAX  ((u8)   (~0U))
+#define        U16_MAX ((u16)  (~0U))
+#define        U32_MAX ((u32)  (~0U))
+#define        U64_MAX ((u64)  (~0ULL))
 
 #define RBD_DRV_NAME "rbd"
 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
@@ -66,7 +69,6 @@
                        (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
 
 #define RBD_MAX_SNAP_COUNT     510     /* allows max snapc to fit in 4KB */
-#define RBD_MAX_OPT_LEN                1024
 
 #define RBD_SNAP_HEAD_NAME     "-"
 
@@ -93,8 +95,6 @@
 #define DEV_NAME_LEN           32
 #define MAX_INT_FORMAT_WIDTH   ((5 * sizeof (int)) / 2 + 1)
 
-#define RBD_READ_ONLY_DEFAULT          false
-
 /*
  * block device image metadata (in-memory version)
  */
@@ -119,16 +119,33 @@ struct rbd_image_header {
  * An rbd image specification.
  *
  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
- * identify an image.
+ * identify an image.  Each rbd_dev structure includes a pointer to
+ * an rbd_spec structure that encapsulates this identity.
+ *
+ * Each of the id's in an rbd_spec has an associated name.  For a
+ * user-mapped image, the names are supplied and the id's associated
+ * with them are looked up.  For a layered image, a parent image is
+ * defined by the tuple, and the names are looked up.
+ *
+ * An rbd_dev structure contains a parent_spec pointer which is
+ * non-null if the image it represents is a child in a layered
+ * image.  This pointer will refer to the rbd_spec structure used
+ * by the parent rbd_dev for its own identity (i.e., the structure
+ * is shared between the parent and child).
+ *
+ * Since these structures are populated once, during the discovery
+ * phase of image construction, they are effectively immutable so
+ * we make no effort to synchronize access to them.
+ *
+ * Note that code herein does not assume the image name is known (it
+ * could be a null pointer).
  */
 struct rbd_spec {
        u64             pool_id;
        char            *pool_name;
 
        char            *image_id;
-       size_t          image_id_len;
        char            *image_name;
-       size_t          image_name_len;
 
        u64             snap_id;
        char            *snap_name;
@@ -136,10 +153,6 @@ struct rbd_spec {
        struct kref     kref;
 };
 
-struct rbd_options {
-       bool    read_only;
-};
-
 /*
  * an instance of the client.  multiple devices may share an rbd client.
  */
@@ -154,7 +167,7 @@ struct rbd_client {
  */
 struct rbd_req_status {
        int done;
-       int rc;
+       s32 rc;
        u64 bytes;
 };
 
@@ -212,11 +225,13 @@ struct rbd_device {
        spinlock_t              lock;           /* queue lock */
 
        struct rbd_image_header header;
-       bool                    exists;
+       atomic_t                exists;
        struct rbd_spec         *spec;
 
        char                    *header_name;
 
+       struct ceph_file_layout layout;
+
        struct ceph_osd_event   *watch_event;
        struct ceph_osd_request *watch_request;
 
@@ -277,6 +292,33 @@ static struct device rbd_root_dev = {
        .release =      rbd_root_dev_release,
 };
 
+static __printf(2, 3)
+void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
+{
+       struct va_format vaf;
+       va_list args;
+
+       va_start(args, fmt);
+       vaf.fmt = fmt;
+       vaf.va = &args;
+
+       if (!rbd_dev)
+               printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
+       else if (rbd_dev->disk)
+               printk(KERN_WARNING "%s: %s: %pV\n",
+                       RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
+       else if (rbd_dev->spec && rbd_dev->spec->image_name)
+               printk(KERN_WARNING "%s: image %s: %pV\n",
+                       RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
+       else if (rbd_dev->spec && rbd_dev->spec->image_id)
+               printk(KERN_WARNING "%s: id %s: %pV\n",
+                       RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
+       else    /* punt */
+               printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
+                       RBD_DRV_NAME, rbd_dev, &vaf);
+       va_end(args);
+}
+
 #ifdef RBD_DEBUG
 #define rbd_assert(expr)                                               \
                if (unlikely(!(expr))) {                                \
@@ -426,6 +468,12 @@ static match_table_t rbd_opts_tokens = {
        {-1, NULL}
 };
 
+struct rbd_options {
+       bool    read_only;
+};
+
+#define RBD_READ_ONLY_DEFAULT  false
+
 static int parse_rbd_opts_token(char *c, void *private)
 {
        struct rbd_options *rbd_opts = private;
@@ -707,7 +755,7 @@ static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
                        goto done;
                rbd_dev->mapping.read_only = true;
        }
-       rbd_dev->exists = true;
+       atomic_set(&rbd_dev->exists, 1);
 done:
        return ret;
 }
@@ -724,7 +772,7 @@ static void rbd_header_free(struct rbd_image_header *header)
        header->snapc = NULL;
 }
 
-static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
+static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
 {
        char *name;
        u64 segment;
@@ -772,6 +820,7 @@ static int rbd_get_num_segments(struct rbd_image_header *header,
 {
        u64 start_seg;
        u64 end_seg;
+       u64 result;
 
        if (!len)
                return 0;
@@ -781,7 +830,11 @@ static int rbd_get_num_segments(struct rbd_image_header *header,
        start_seg = ofs >> header->obj_order;
        end_seg = (ofs + len - 1) >> header->obj_order;
 
-       return end_seg - start_seg + 1;
+       result = end_seg - start_seg + 1;
+       if (result > (u64) INT_MAX)
+               return -ERANGE;
+
+       return (int) result;
 }
 
 /*
@@ -949,8 +1002,10 @@ static struct bio *bio_chain_clone_range(struct bio **bio_src,
                unsigned int bi_size;
                struct bio *bio;
 
-               if (!bi)
+               if (!bi) {
+                       rbd_warn(NULL, "bio_chain exhausted with %u left", len);
                        goto out_err;   /* EINVAL; ran out of bio's */
+               }
                bi_size = min_t(unsigned int, bi->bi_size - off, len);
                bio = bio_clone_range(bi, off, bi_size, gfpmask);
                if (!bio)
@@ -976,44 +1031,84 @@ out_err:
        return NULL;
 }
 
-/*
- * helpers for osd request op vectors.
- */
-static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
-                                       int opcode, u32 payload_len)
+struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
 {
-       struct ceph_osd_req_op *ops;
+       struct ceph_osd_req_op *op;
+       va_list args;
+       size_t size;
 
-       ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
-       if (!ops)
+       op = kzalloc(sizeof (*op), GFP_NOIO);
+       if (!op)
                return NULL;
+       op->op = opcode;
+       va_start(args, opcode);
+       switch (opcode) {
+       case CEPH_OSD_OP_READ:
+       case CEPH_OSD_OP_WRITE:
+               /* rbd_osd_req_op_create(READ, offset, length) */
+               /* rbd_osd_req_op_create(WRITE, offset, length) */
+               op->extent.offset = va_arg(args, u64);
+               op->extent.length = va_arg(args, u64);
+               if (opcode == CEPH_OSD_OP_WRITE)
+                       op->payload_len = op->extent.length;
+               break;
+       case CEPH_OSD_OP_CALL:
+               /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
+               op->cls.class_name = va_arg(args, char *);
+               size = strlen(op->cls.class_name);
+               rbd_assert(size <= (size_t) U8_MAX);
+               op->cls.class_len = size;
+               op->payload_len = size;
+
+               op->cls.method_name = va_arg(args, char *);
+               size = strlen(op->cls.method_name);
+               rbd_assert(size <= (size_t) U8_MAX);
+               op->cls.method_len = size;
+               op->payload_len += size;
+
+               op->cls.argc = 0;
+               op->cls.indata = va_arg(args, void *);
+               size = va_arg(args, size_t);
+               rbd_assert(size <= (size_t) U32_MAX);
+               op->cls.indata_len = (u32) size;
+               op->payload_len += size;
+               break;
+       case CEPH_OSD_OP_NOTIFY_ACK:
+       case CEPH_OSD_OP_WATCH:
+               /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
+               /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
+               op->watch.cookie = va_arg(args, u64);
+               op->watch.ver = va_arg(args, u64);
+               op->watch.ver = cpu_to_le64(op->watch.ver);
+               if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
+                       op->watch.flag = (u8) 1;
+               break;
+       default:
+               rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
+               kfree(op);
+               op = NULL;
+               break;
+       }
+       va_end(args);
 
-       ops[0].op = opcode;
-
-       /*
-        * op extent offset and length will be set later on
-        * in calc_raw_layout()
-        */
-       ops[0].payload_len = payload_len;
-
-       return ops;
+       return op;
 }
 
-static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
+static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
 {
-       kfree(ops);
+       kfree(op);
 }
 
 static void rbd_coll_end_req_index(struct request *rq,
                                   struct rbd_req_coll *coll,
                                   int index,
-                                  int ret, u64 len)
+                                  s32 ret, u64 len)
 {
        struct request_queue *q;
        int min, max, i;
 
        dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
-            coll, index, ret, (unsigned long long) len);
+            coll, index, (int)ret, (unsigned long long)len);
 
        if (!rq)
                return;
@@ -1034,7 +1129,7 @@ static void rbd_coll_end_req_index(struct request *rq,
                max++;
 
        for (i = min; i<max; i++) {
-               __blk_end_request(rq, coll->status[i].rc,
+               __blk_end_request(rq, (int)coll->status[i].rc,
                                  coll->status[i].bytes);
                coll->num_done++;
                kref_put(&coll->kref, rbd_coll_release);
@@ -1042,10 +1137,12 @@ static void rbd_coll_end_req_index(struct request *rq,
        spin_unlock_irq(q->queue_lock);
 }
 
-static void rbd_coll_end_req(struct rbd_request *req,
-                            int ret, u64 len)
+static void rbd_coll_end_req(struct rbd_request *rbd_req,
+                            s32 ret, u64 len)
 {
-       rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
+       rbd_coll_end_req_index(rbd_req->rq,
+                               rbd_req->coll, rbd_req->coll_index,
+                               ret, len);
 }
 
 /*
@@ -1060,117 +1157,102 @@ static int rbd_do_request(struct request *rq,
                          struct page **pages,
                          int num_pages,
                          int flags,
-                         struct ceph_osd_req_op *ops,
+                         struct ceph_osd_req_op *op,
                          struct rbd_req_coll *coll,
                          int coll_index,
-                         void (*rbd_cb)(struct ceph_osd_request *req,
-                                        struct ceph_msg *msg),
-                         struct ceph_osd_request **linger_req,
+                         void (*rbd_cb)(struct ceph_osd_request *,
+                                        struct ceph_msg *),
                          u64 *ver)
 {
-       struct ceph_osd_request *req;
-       struct ceph_file_layout *layout;
-       int ret;
-       u64 bno;
-       struct timespec mtime = CURRENT_TIME;
-       struct rbd_request *req_data;
-       struct ceph_osd_request_head *reqhead;
        struct ceph_osd_client *osdc;
-
-       req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
-       if (!req_data) {
-               if (coll)
-                       rbd_coll_end_req_index(rq, coll, coll_index,
-                                              -ENOMEM, len);
-               return -ENOMEM;
-       }
-
-       if (coll) {
-               req_data->coll = coll;
-               req_data->coll_index = coll_index;
-       }
+       struct ceph_osd_request *osd_req;
+       struct rbd_request *rbd_req = NULL;
+       struct timespec mtime = CURRENT_TIME;
+       int ret;
 
        dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
                object_name, (unsigned long long) ofs,
                (unsigned long long) len, coll, coll_index);
 
        osdc = &rbd_dev->rbd_client->client->osdc;
-       req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
-                                       false, GFP_NOIO, pages, bio);
-       if (!req) {
-               ret = -ENOMEM;
-               goto done_pages;
-       }
-
-       req->r_callback = rbd_cb;
+       osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_NOIO);
+       if (!osd_req)
+               return -ENOMEM;
 
-       req_data->rq = rq;
-       req_data->bio = bio;
-       req_data->pages = pages;
-       req_data->len = len;
+       osd_req->r_flags = flags;
+       osd_req->r_pages = pages;
+       if (bio) {
+               osd_req->r_bio = bio;
+               bio_get(osd_req->r_bio);
+       }
 
-       req->r_priv = req_data;
+       if (coll) {
+               ret = -ENOMEM;
+               rbd_req = kmalloc(sizeof(*rbd_req), GFP_NOIO);
+               if (!rbd_req)
+                       goto done_osd_req;
+
+               rbd_req->rq = rq;
+               rbd_req->bio = bio;
+               rbd_req->pages = pages;
+               rbd_req->len = len;
+               rbd_req->coll = coll;
+               rbd_req->coll_index = coll_index;
+       }
 
-       reqhead = req->r_request->front.iov_base;
-       reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
+       osd_req->r_callback = rbd_cb;
+       osd_req->r_priv = rbd_req;
 
-       strncpy(req->r_oid, object_name, sizeof(req->r_oid));
-       req->r_oid_len = strlen(req->r_oid);
+       strncpy(osd_req->r_oid, object_name, sizeof(osd_req->r_oid));
+       osd_req->r_oid_len = strlen(osd_req->r_oid);
 
-       layout = &req->r_file_layout;
-       memset(layout, 0, sizeof(*layout));
-       layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
-       layout->fl_stripe_count = cpu_to_le32(1);
-       layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
-       layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
-       ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
-                                  req, ops);
-       rbd_assert(ret == 0);
+       osd_req->r_file_layout = rbd_dev->layout;       /* struct */
+       osd_req->r_num_pages = calc_pages_for(ofs, len);
+       osd_req->r_page_alignment = ofs & ~PAGE_MASK;
 
-       ceph_osdc_build_request(req, ofs, &len,
-                               ops,
-                               snapc,
-                               &mtime,
-                               req->r_oid, req->r_oid_len);
+       ceph_osdc_build_request(osd_req, ofs, len, 1, op,
+                               snapc, snapid, &mtime);
 
-       if (linger_req) {
-               ceph_osdc_set_request_linger(osdc, req);
-               *linger_req = req;
+       if (op->op == CEPH_OSD_OP_WATCH && op->watch.flag) {
+               ceph_osdc_set_request_linger(osdc, osd_req);
+               rbd_dev->watch_request = osd_req;
        }
 
-       ret = ceph_osdc_start_request(osdc, req, false);
+       ret = ceph_osdc_start_request(osdc, osd_req, false);
        if (ret < 0)
                goto done_err;
 
        if (!rbd_cb) {
-               ret = ceph_osdc_wait_request(osdc, req);
+               u64 version;
+
+               ret = ceph_osdc_wait_request(osdc, osd_req);
+               version = le64_to_cpu(osd_req->r_reassert_version.version);
                if (ver)
-                       *ver = le64_to_cpu(req->r_reassert_version.version);
-               dout("reassert_ver=%llu\n",
-                       (unsigned long long)
-                               le64_to_cpu(req->r_reassert_version.version));
-               ceph_osdc_put_request(req);
+                       *ver = version;
+               dout("reassert_ver=%llu\n", (unsigned long long) version);
+               ceph_osdc_put_request(osd_req);
        }
        return ret;
 
 done_err:
-       bio_chain_put(req_data->bio);
-       ceph_osdc_put_request(req);
-done_pages:
-       rbd_coll_end_req(req_data, ret, len);
-       kfree(req_data);
+       if (bio)
+               bio_chain_put(osd_req->r_bio);
+       kfree(rbd_req);
+done_osd_req:
+       ceph_osdc_put_request(osd_req);
+
        return ret;
 }
 
 /*
  * Ceph osd op callback
  */
-static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
+static void rbd_req_cb(struct ceph_osd_request *osd_req, struct ceph_msg *msg)
 {
-       struct rbd_request *req_data = req->r_priv;
+       struct rbd_request *rbd_req = osd_req->r_priv;
        struct ceph_osd_reply_head *replyhead;
        struct ceph_osd_op *op;
-       __s32 rc;
+       s32 rc;
        u64 bytes;
        int read_op;
 
@@ -1178,68 +1260,66 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
        replyhead = msg->front.iov_base;
        WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
        op = (void *)(replyhead + 1);
-       rc = le32_to_cpu(replyhead->result);
+       rc = (s32)le32_to_cpu(replyhead->result);
        bytes = le64_to_cpu(op->extent.length);
        read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
 
        dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
                (unsigned long long) bytes, read_op, (int) rc);
 
-       if (rc == -ENOENT && read_op) {
-               zero_bio_chain(req_data->bio, 0);
+       if (rc == (s32)-ENOENT && read_op) {
+               zero_bio_chain(rbd_req->bio, 0);
                rc = 0;
-       } else if (rc == 0 && read_op && bytes < req_data->len) {
-               zero_bio_chain(req_data->bio, bytes);
-               bytes = req_data->len;
+       } else if (rc == 0 && read_op && bytes < rbd_req->len) {
+               zero_bio_chain(rbd_req->bio, bytes);
+               bytes = rbd_req->len;
        }
 
-       rbd_coll_end_req(req_data, rc, bytes);
+       rbd_coll_end_req(rbd_req, rc, bytes);
 
-       if (req_data->bio)
-               bio_chain_put(req_data->bio);
+       if (rbd_req->bio)
+               bio_chain_put(rbd_req->bio);
 
-       ceph_osdc_put_request(req);
-       kfree(req_data);
+       ceph_osdc_put_request(osd_req);
+       kfree(rbd_req);
 }
 
-static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
+static void rbd_simple_req_cb(struct ceph_osd_request *osd_req,
+                               struct ceph_msg *msg)
 {
-       ceph_osdc_put_request(req);
+       ceph_osdc_put_request(osd_req);
 }
 
 /*
  * Do a synchronous ceph osd operation
  */
 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
-                          struct ceph_snap_context *snapc,
-                          u64 snapid,
                           int flags,
-                          struct ceph_osd_req_op *ops,
+                          struct ceph_osd_req_op *op,
                           const char *object_name,
                           u64 ofs, u64 inbound_size,
                           char *inbound,
-                          struct ceph_osd_request **linger_req,
                           u64 *ver)
 {
        int ret;
        struct page **pages;
        int num_pages;
 
-       rbd_assert(ops != NULL);
+       rbd_assert(op != NULL);
 
        num_pages = calc_pages_for(ofs, inbound_size);
        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
        if (IS_ERR(pages))
                return PTR_ERR(pages);
 
-       ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
+       ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
                          object_name, ofs, inbound_size, NULL,
                          pages, num_pages,
                          flags,
-                         ops,
+                         op,
                          NULL, 0,
                          NULL,
-                         linger_req, ver);
+                         ver);
        if (ret < 0)
                goto done;
 
@@ -1262,12 +1342,11 @@ static int rbd_do_op(struct request *rq,
                     struct rbd_req_coll *coll,
                     int coll_index)
 {
-       char *seg_name;
+       const char *seg_name;
        u64 seg_ofs;
        u64 seg_len;
        int ret;
-       struct ceph_osd_req_op *ops;
-       u32 payload_len;
+       struct ceph_osd_req_op *op;
        int opcode;
        int flags;
        u64 snapid;
@@ -1282,18 +1361,16 @@ static int rbd_do_op(struct request *rq,
                opcode = CEPH_OSD_OP_WRITE;
                flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
                snapid = CEPH_NOSNAP;
-               payload_len = seg_len;
        } else {
                opcode = CEPH_OSD_OP_READ;
                flags = CEPH_OSD_FLAG_READ;
-               snapc = NULL;
+               rbd_assert(!snapc);
                snapid = rbd_dev->spec->snap_id;
-               payload_len = 0;
        }
 
        ret = -ENOMEM;
-       ops = rbd_create_rw_ops(1, opcode, payload_len);
-       if (!ops)
+       op = rbd_osd_req_op_create(opcode, seg_ofs, seg_len);
+       if (!op)
                goto done;
 
        /* we've taken care of segment sizes earlier when we
@@ -1306,11 +1383,13 @@ static int rbd_do_op(struct request *rq,
                             bio,
                             NULL, 0,
                             flags,
-                            ops,
+                            op,
                             coll, coll_index,
-                            rbd_req_cb, 0, NULL);
-
-       rbd_destroy_ops(ops);
+                            rbd_req_cb, NULL);
+       if (ret < 0)
+               rbd_coll_end_req_index(rq, coll, coll_index,
+                                       (s32)ret, seg_len);
+       rbd_osd_req_op_destroy(op);
 done:
        kfree(seg_name);
        return ret;
@@ -1320,24 +1399,21 @@ done:
  * Request sync osd read
  */
 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
-                         u64 snapid,
                          const char *object_name,
                          u64 ofs, u64 len,
                          char *buf,
                          u64 *ver)
 {
-       struct ceph_osd_req_op *ops;
+       struct ceph_osd_req_op *op;
        int ret;
 
-       ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
-       if (!ops)
+       op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, ofs, len);
+       if (!op)
                return -ENOMEM;
 
-       ret = rbd_req_sync_op(rbd_dev, NULL,
-                              snapid,
-                              CEPH_OSD_FLAG_READ,
-                              ops, object_name, ofs, len, buf, NULL, ver);
-       rbd_destroy_ops(ops);
+       ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ,
+                              op, object_name, ofs, len, buf, ver);
+       rbd_osd_req_op_destroy(op);
 
        return ret;
 }
@@ -1349,26 +1425,23 @@ static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
                                   u64 ver,
                                   u64 notify_id)
 {
-       struct ceph_osd_req_op *ops;
+       struct ceph_osd_req_op *op;
        int ret;
 
-       ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
-       if (!ops)
+       op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
+       if (!op)
                return -ENOMEM;
 
-       ops[0].watch.ver = cpu_to_le64(ver);
-       ops[0].watch.cookie = notify_id;
-       ops[0].watch.flag = 0;
-
        ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
                          rbd_dev->header_name, 0, 0, NULL,
                          NULL, 0,
                          CEPH_OSD_FLAG_READ,
-                         ops,
+                         op,
                          NULL, 0,
-                         rbd_simple_req_cb, 0, NULL);
+                         rbd_simple_req_cb, NULL);
+
+       rbd_osd_req_op_destroy(op);
 
-       rbd_destroy_ops(ops);
        return ret;
 }
 
@@ -1386,83 +1459,51 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
                (unsigned int) opcode);
        rc = rbd_dev_refresh(rbd_dev, &hver);
        if (rc)
-               pr_warning(RBD_DRV_NAME "%d got notification but failed to "
-                          " update snaps: %d\n", rbd_dev->major, rc);
+               rbd_warn(rbd_dev, "got notification but failed to "
+                          " update snaps: %d\n", rc);
 
        rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
 }
 
 /*
- * Request sync osd watch
+ * Request sync osd watch/unwatch.  The value of "start" determines
+ * whether a watch request is being initiated or torn down.
  */
-static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
+static int rbd_req_sync_watch(struct rbd_device *rbd_dev, int start)
 {
-       struct ceph_osd_req_op *ops;
-       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
-       int ret;
+       struct ceph_osd_req_op *op;
+       int ret = 0;
 
-       ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
-       if (!ops)
-               return -ENOMEM;
+       rbd_assert(start ^ !!rbd_dev->watch_event);
+       rbd_assert(start ^ !!rbd_dev->watch_request);
 
-       ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
-                                    (void *)rbd_dev, &rbd_dev->watch_event);
-       if (ret < 0)
-               goto fail;
+       if (start) {
+               struct ceph_osd_client *osdc;
 
-       ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
-       ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
-       ops[0].watch.flag = 1;
+               osdc = &rbd_dev->rbd_client->client->osdc;
+               ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, rbd_dev,
+                                               &rbd_dev->watch_event);
+               if (ret < 0)
+                       return ret;
+       }
 
-       ret = rbd_req_sync_op(rbd_dev, NULL,
-                             CEPH_NOSNAP,
+       op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
+                               rbd_dev->watch_event->cookie,
+                               rbd_dev->header.obj_version, start);
+       if (op)
+               ret = rbd_req_sync_op(rbd_dev,
                              CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-                             ops,
-                             rbd_dev->header_name,
-                             0, 0, NULL,
-                             &rbd_dev->watch_request, NULL);
+                             op, rbd_dev->header_name,
+                             0, 0, NULL, NULL);
 
-       if (ret < 0)
-               goto fail_event;
-
-       rbd_destroy_ops(ops);
-       return 0;
-
-fail_event:
-       ceph_osdc_cancel_event(rbd_dev->watch_event);
-       rbd_dev->watch_event = NULL;
-fail:
-       rbd_destroy_ops(ops);
-       return ret;
-}
-
-/*
- * Request sync osd unwatch
- */
-static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
-{
-       struct ceph_osd_req_op *ops;
-       int ret;
-
-       ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
-       if (!ops)
-               return -ENOMEM;
-
-       ops[0].watch.ver = 0;
-       ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
-       ops[0].watch.flag = 0;
-
-       ret = rbd_req_sync_op(rbd_dev, NULL,
-                             CEPH_NOSNAP,
-                             CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-                             ops,
-                             rbd_dev->header_name,
-                             0, 0, NULL, NULL, NULL);
+       /* Cancel the event if we're tearing down, or on error */
 
+       if (!start || !op || ret < 0) {
+               ceph_osdc_cancel_event(rbd_dev->watch_event);
+               rbd_dev->watch_event = NULL;
+       }
+       rbd_osd_req_op_destroy(op);
 
-       rbd_destroy_ops(ops);
-       ceph_osdc_cancel_event(rbd_dev->watch_event);
-       rbd_dev->watch_event = NULL;
        return ret;
 }
 
@@ -1477,13 +1518,9 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
                             size_t outbound_size,
                             char *inbound,
                             size_t inbound_size,
-                            int flags,
                             u64 *ver)
 {
-       struct ceph_osd_req_op *ops;
-       int class_name_len = strlen(class_name);
-       int method_name_len = strlen(method_name);
-       int payload_size;
+       struct ceph_osd_req_op *op;
        int ret;
 
        /*
@@ -1494,26 +1531,16 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
         * the perspective of the server side) in the OSD request
         * operation.
         */
-       payload_size = class_name_len + method_name_len + outbound_size;
-       ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
-       if (!ops)
+       op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
+                                       method_name, outbound, outbound_size);
+       if (!op)
                return -ENOMEM;
 
-       ops[0].cls.class_name = class_name;
-       ops[0].cls.class_len = (__u8) class_name_len;
-       ops[0].cls.method_name = method_name;
-       ops[0].cls.method_len = (__u8) method_name_len;
-       ops[0].cls.argc = 0;
-       ops[0].cls.indata = outbound;
-       ops[0].cls.indata_len = outbound_size;
-
-       ret = rbd_req_sync_op(rbd_dev, NULL,
-                              CEPH_NOSNAP,
-                              flags, ops,
+       ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ, op,
                               object_name, 0, inbound_size, inbound,
-                              NULL, ver);
+                              ver);
 
-       rbd_destroy_ops(ops);
+       rbd_osd_req_op_destroy(op);
 
        dout("cls_exec returned %d\n", ret);
        return ret;
@@ -1533,113 +1560,123 @@ static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
        return coll;
 }
 
+static int rbd_dev_do_request(struct request *rq,
+                               struct rbd_device *rbd_dev,
+                               struct ceph_snap_context *snapc,
+                               u64 ofs, unsigned int size,
+                               struct bio *bio_chain)
+{
+       int num_segs;
+       struct rbd_req_coll *coll;
+       unsigned int bio_offset;
+       int cur_seg = 0;
+
+       dout("%s 0x%x bytes at 0x%llx\n",
+               rq_data_dir(rq) == WRITE ? "write" : "read",
+               size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
+
+       num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
+       if (num_segs <= 0)
+               return num_segs;
+
+       coll = rbd_alloc_coll(num_segs);
+       if (!coll)
+               return -ENOMEM;
+
+       bio_offset = 0;
+       do {
+               u64 limit = rbd_segment_length(rbd_dev, ofs, size);
+               unsigned int clone_size;
+               struct bio *bio_clone;
+
+               BUG_ON(limit > (u64)UINT_MAX);
+               clone_size = (unsigned int)limit;
+               dout("bio_chain->bi_vcnt=%hu\n", bio_chain->bi_vcnt);
+
+               kref_get(&coll->kref);
+
+               /* Pass a cloned bio chain via an osd request */
+
+               bio_clone = bio_chain_clone_range(&bio_chain,
+                                       &bio_offset, clone_size,
+                                       GFP_ATOMIC);
+               if (bio_clone)
+                       (void)rbd_do_op(rq, rbd_dev, snapc,
+                                       ofs, clone_size,
+                                       bio_clone, coll, cur_seg);
+               else
+                       rbd_coll_end_req_index(rq, coll, cur_seg,
+                                               (s32)-ENOMEM,
+                                               clone_size);
+               size -= clone_size;
+               ofs += clone_size;
+
+               cur_seg++;
+       } while (size > 0);
+       kref_put(&coll->kref, rbd_coll_release);
+
+       return 0;
+}
+
 /*
  * block device queue callback
  */
 static void rbd_rq_fn(struct request_queue *q)
 {
        struct rbd_device *rbd_dev = q->queuedata;
+       bool read_only = rbd_dev->mapping.read_only;
        struct request *rq;
 
        while ((rq = blk_fetch_request(q))) {
-               struct bio *bio;
-               bool do_write;
-               unsigned int size;
-               u64 ofs;
-               int num_segs, cur_seg = 0;
-               struct rbd_req_coll *coll;
-               struct ceph_snap_context *snapc;
-               unsigned int bio_offset;
+               struct ceph_snap_context *snapc = NULL;
+               unsigned int size = 0;
+               int result;
 
                dout("fetched request\n");
 
-               /* filter out block requests we don't understand */
+               /* Filter out block requests we don't understand */
+
                if ((rq->cmd_type != REQ_TYPE_FS)) {
                        __blk_end_request_all(rq, 0);
                        continue;
                }
-
-               /* deduce our operation (read, write) */
-               do_write = (rq_data_dir(rq) == WRITE);
-               if (do_write && rbd_dev->mapping.read_only) {
-                       __blk_end_request_all(rq, -EROFS);
-                       continue;
-               }
-
                spin_unlock_irq(q->queue_lock);
 
-               down_read(&rbd_dev->header_rwsem);
-
-               if (!rbd_dev->exists) {
-                       rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
+               /* Write requests need a reference to the snapshot context */
+
+               if (rq_data_dir(rq) == WRITE) {
+                       result = -EROFS;
+                       if (read_only) /* Can't write to a read-only device */
+                               goto out_end_request;
+
+                       /*
+                        * Note that each osd request will take its
+                        * own reference to the snapshot context
+                        * supplied.  The reference we take here
+                        * just guarantees the one we provide stays
+                        * valid.
+                        */
+                       down_read(&rbd_dev->header_rwsem);
+                       snapc = ceph_get_snap_context(rbd_dev->header.snapc);
                        up_read(&rbd_dev->header_rwsem);
+                       rbd_assert(snapc != NULL);
+               } else if (!atomic_read(&rbd_dev->exists)) {
+                       rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
                        dout("request for non-existent snapshot");
-                       spin_lock_irq(q->queue_lock);
-                       __blk_end_request_all(rq, -ENXIO);
-                       continue;
+                       result = -ENXIO;
+                       goto out_end_request;
                }
 
-               snapc = ceph_get_snap_context(rbd_dev->header.snapc);
-
-               up_read(&rbd_dev->header_rwsem);
-
                size = blk_rq_bytes(rq);
-               ofs = blk_rq_pos(rq) * SECTOR_SIZE;
-               bio = rq->bio;
-
-               dout("%s 0x%x bytes at 0x%llx\n",
-                    do_write ? "write" : "read",
-                    size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
-
-               num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
-               if (num_segs <= 0) {
-                       spin_lock_irq(q->queue_lock);
-                       __blk_end_request_all(rq, num_segs);
+               result = rbd_dev_do_request(rq, rbd_dev, snapc,
+                               blk_rq_pos(rq) * SECTOR_SIZE,
+                               size, rq->bio);
+out_end_request:
+               if (snapc)
                        ceph_put_snap_context(snapc);
-                       continue;
-               }
-               coll = rbd_alloc_coll(num_segs);
-               if (!coll) {
-                       spin_lock_irq(q->queue_lock);
-                       __blk_end_request_all(rq, -ENOMEM);
-                       ceph_put_snap_context(snapc);
-                       continue;
-               }
-
-               bio_offset = 0;
-               do {
-                       u64 limit = rbd_segment_length(rbd_dev, ofs, size);
-                       unsigned int chain_size;
-                       struct bio *bio_chain;
-
-                       BUG_ON(limit > (u64) UINT_MAX);
-                       chain_size = (unsigned int) limit;
-                       dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
-
-                       kref_get(&coll->kref);
-
-                       /* Pass a cloned bio chain via an osd request */
-
-                       bio_chain = bio_chain_clone_range(&bio,
-                                               &bio_offset, chain_size,
-                                               GFP_ATOMIC);
-                       if (bio_chain)
-                               (void) rbd_do_op(rq, rbd_dev, snapc,
-                                               ofs, chain_size,
-                                               bio_chain, coll, cur_seg);
-                       else
-                               rbd_coll_end_req_index(rq, coll, cur_seg,
-                                                      -ENOMEM, chain_size);
-                       size -= chain_size;
-                       ofs += chain_size;
-
-                       cur_seg++;
-               } while (size > 0);
-               kref_put(&coll->kref, rbd_coll_release);
-
                spin_lock_irq(q->queue_lock);
-
-               ceph_put_snap_context(snapc);
+               if (!size || result < 0)
+                       __blk_end_request_all(rq, result);
        }
 }
 
@@ -1741,8 +1778,7 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
                if (!ondisk)
                        return ERR_PTR(-ENOMEM);
 
-               ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
-                                      rbd_dev->header_name,
+               ret = rbd_req_sync_read(rbd_dev, rbd_dev->header_name,
                                       0, size,
                                       (char *) ondisk, version);
 
@@ -1750,15 +1786,13 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
                        goto out_err;
                if (WARN_ON((size_t) ret < size)) {
                        ret = -ENXIO;
-                       pr_warning("short header read for image %s"
-                                       " (want %zd got %d)\n",
-                               rbd_dev->spec->image_name, size, ret);
+                       rbd_warn(rbd_dev, "short header read (want %zd got %d)",
+                               size, ret);
                        goto out_err;
                }
                if (!rbd_dev_ondisk_valid(ondisk)) {
                        ret = -ENXIO;
-                       pr_warning("invalid header for image %s\n",
-                               rbd_dev->spec->image_name);
+                       rbd_warn(rbd_dev, "invalid header");
                        goto out_err;
                }
 
@@ -2243,6 +2277,7 @@ struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
                return NULL;
 
        spin_lock_init(&rbd_dev->lock);
+       atomic_set(&rbd_dev->exists, 0);
        INIT_LIST_HEAD(&rbd_dev->node);
        INIT_LIST_HEAD(&rbd_dev->snaps);
        init_rwsem(&rbd_dev->header_rwsem);
@@ -2250,6 +2285,13 @@ struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
        rbd_dev->spec = spec;
        rbd_dev->rbd_client = rbdc;
 
+       /* Initialize the layout used for all rbd requests */
+
+       rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
+       rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
+       rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
+       rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
+
        return rbd_dev;
 }
 
@@ -2363,8 +2405,7 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
        ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_size",
                                (char *) &snapid, sizeof (snapid),
-                               (char *) &size_buf, sizeof (size_buf),
-                               CEPH_OSD_FLAG_READ, NULL);
+                               (char *) &size_buf, sizeof (size_buf), NULL);
        dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
        if (ret < 0)
                return ret;
@@ -2399,8 +2440,7 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
        ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_object_prefix",
                                NULL, 0,
-                               reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
-                               CEPH_OSD_FLAG_READ, NULL);
+                               reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
        dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
@@ -2439,7 +2479,7 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
                                "rbd", "get_features",
                                (char *) &snapid, sizeof (snapid),
                                (char *) &features_buf, sizeof (features_buf),
-                               CEPH_OSD_FLAG_READ, NULL);
+                               NULL);
        dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
        if (ret < 0)
                return ret;
@@ -2474,7 +2514,6 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        void *end;
        char *image_id;
        u64 overlap;
-       size_t len = 0;
        int ret;
 
        parent_spec = rbd_spec_alloc();
@@ -2495,8 +2534,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_parent",
                                (char *) &snapid, sizeof (snapid),
-                               (char *) reply_buf, size,
-                               CEPH_OSD_FLAG_READ, NULL);
+                               (char *) reply_buf, size, NULL);
        dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
        if (ret < 0)
                goto out_err;
@@ -2508,13 +2546,18 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        if (parent_spec->pool_id == CEPH_NOPOOL)
                goto out;       /* No parent?  No problem. */
 
-       image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
+       /* The ceph file layout needs to fit pool id in 32 bits */
+
+       ret = -EIO;
+       if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
+               goto out;
+
+       image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
        if (IS_ERR(image_id)) {
                ret = PTR_ERR(image_id);
                goto out_err;
        }
        parent_spec->image_id = image_id;
-       parent_spec->image_id_len = len;
        ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
        ceph_decode_64_safe(&p, end, overlap, out_err);
 
@@ -2544,15 +2587,15 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
 
        rbd_assert(!rbd_dev->spec->image_name);
 
-       image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len;
+       len = strlen(rbd_dev->spec->image_id);
+       image_id_size = sizeof (__le32) + len;
        image_id = kmalloc(image_id_size, GFP_KERNEL);
        if (!image_id)
                return NULL;
 
        p = image_id;
        end = (char *) image_id + image_id_size;
-       ceph_encode_string(&p, end, rbd_dev->spec->image_id,
-                               (u32) rbd_dev->spec->image_id_len);
+       ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
 
        size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
        reply_buf = kmalloc(size, GFP_KERNEL);
@@ -2562,8 +2605,7 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
        ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
                                "rbd", "dir_get_name",
                                image_id, image_id_size,
-                               (char *) reply_buf, size,
-                               CEPH_OSD_FLAG_READ, NULL);
+                               (char *) reply_buf, size, NULL);
        if (ret < 0)
                goto out;
        p = reply_buf;
@@ -2602,8 +2644,11 @@ static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
 
        osdc = &rbd_dev->rbd_client->client->osdc;
        name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
-       if (!name)
-               return -EIO;    /* pool id too large (>= 2^31) */
+       if (!name) {
+               rbd_warn(rbd_dev, "there is no pool with id %llu",
+                       rbd_dev->spec->pool_id);        /* Really a BUG() */
+               return -EIO;
+       }
 
        rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
        if (!rbd_dev->spec->pool_name)
@@ -2612,19 +2657,17 @@ static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
        /* Fetch the image name; tolerate failure here */
 
        name = rbd_dev_image_name(rbd_dev);
-       if (name) {
-               rbd_dev->spec->image_name_len = strlen(name);
+       if (name)
                rbd_dev->spec->image_name = (char *) name;
-       } else {
-               pr_warning(RBD_DRV_NAME "%d "
-                       "unable to get image name for image id %s\n",
-                       rbd_dev->major, rbd_dev->spec->image_id);
-       }
+       else
+               rbd_warn(rbd_dev, "unable to get image name");
 
        /* Look up the snapshot name. */
 
        name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
        if (!name) {
+               rbd_warn(rbd_dev, "no snapshot with id %llu",
+                       rbd_dev->spec->snap_id);        /* Really a BUG() */
                ret = -EIO;
                goto out_err;
        }
@@ -2668,8 +2711,7 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
        ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_snapcontext",
                                NULL, 0,
-                               reply_buf, size,
-                               CEPH_OSD_FLAG_READ, ver);
+                               reply_buf, size, ver);
        dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
@@ -2738,8 +2780,7 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
        ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_snapshot_name",
                                (char *) &snap_id, sizeof (snap_id),
-                               reply_buf, size,
-                               CEPH_OSD_FLAG_READ, NULL);
+                               reply_buf, size, NULL);
        dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
@@ -2766,7 +2807,7 @@ out:
 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
                u64 *snap_size, u64 *snap_features)
 {
-       __le64 snap_id;
+       u64 snap_id;
        u8 order;
        int ret;
 
@@ -2868,7 +2909,7 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
                        /* Existing snapshot not in the new snap context */
 
                        if (rbd_dev->spec->snap_id == snap->id)
-                               rbd_dev->exists = false;
+                               atomic_set(&rbd_dev->exists, 0);
                        rbd_remove_snap_dev(snap);
                        dout("%ssnap id %llu has been removed\n",
                                rbd_dev->spec->snap_id == snap->id ?
@@ -2983,22 +3024,6 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
        device_unregister(&rbd_dev->dev);
 }
 
-static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
-{
-       int ret, rc;
-
-       do {
-               ret = rbd_req_sync_watch(rbd_dev);
-               if (ret == -ERANGE) {
-                       rc = rbd_dev_refresh(rbd_dev, NULL);
-                       if (rc < 0)
-                               return rc;
-               }
-       } while (ret == -ERANGE);
-
-       return ret;
-}
-
 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
 
 /*
@@ -3138,11 +3163,9 @@ static inline char *dup_token(const char **buf, size_t *lenp)
        size_t len;
 
        len = next_token(buf);
-       dup = kmalloc(len + 1, GFP_KERNEL);
+       dup = kmemdup(*buf, len + 1, GFP_KERNEL);
        if (!dup)
                return NULL;
-
-       memcpy(dup, *buf, len);
        *(dup + len) = '\0';
        *buf += len;
 
@@ -3210,8 +3233,10 @@ static int rbd_add_parse_args(const char *buf,
        /* The first four tokens are required */
 
        len = next_token(&buf);
-       if (!len)
-               return -EINVAL; /* Missing monitor address(es) */
+       if (!len) {
+               rbd_warn(NULL, "no monitor address(es) provided");
+               return -EINVAL;
+       }
        mon_addrs = buf;
        mon_addrs_size = len + 1;
        buf += len;
@@ -3220,8 +3245,10 @@ static int rbd_add_parse_args(const char *buf,
        options = dup_token(&buf, NULL);
        if (!options)
                return -ENOMEM;
-       if (!*options)
-               goto out_err;   /* Missing options */
+       if (!*options) {
+               rbd_warn(NULL, "no options provided");
+               goto out_err;
+       }
 
        spec = rbd_spec_alloc();
        if (!spec)
@@ -3230,14 +3257,18 @@ static int rbd_add_parse_args(const char *buf,
        spec->pool_name = dup_token(&buf, NULL);
        if (!spec->pool_name)
                goto out_mem;
-       if (!*spec->pool_name)
-               goto out_err;   /* Missing pool name */
+       if (!*spec->pool_name) {
+               rbd_warn(NULL, "no pool name provided");
+               goto out_err;
+       }
 
-       spec->image_name = dup_token(&buf, &spec->image_name_len);
+       spec->image_name = dup_token(&buf, NULL);
        if (!spec->image_name)
                goto out_mem;
-       if (!*spec->image_name)
-               goto out_err;   /* Missing image name */
+       if (!*spec->image_name) {
+               rbd_warn(NULL, "no image name provided");
+               goto out_err;
+       }
 
        /*
         * Snapshot name is optional; default is to use "-"
@@ -3251,10 +3282,9 @@ static int rbd_add_parse_args(const char *buf,
                ret = -ENAMETOOLONG;
                goto out_err;
        }
-       spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
+       spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
        if (!spec->snap_name)
                goto out_mem;
-       memcpy(spec->snap_name, buf, len);
        *(spec->snap_name + len) = '\0';
 
        /* Initialize all rbd options to the defaults */
@@ -3323,7 +3353,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
         * First, see if the format 2 image id file exists, and if
         * so, get the image's persistent id from it.
         */
-       size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
+       size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
        object_name = kmalloc(size, GFP_NOIO);
        if (!object_name)
                return -ENOMEM;
@@ -3342,8 +3372,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
        ret = rbd_req_sync_exec(rbd_dev, object_name,
                                "rbd", "get_id",
                                NULL, 0,
-                               response, RBD_IMAGE_ID_LEN_MAX,
-                               CEPH_OSD_FLAG_READ, NULL);
+                               response, RBD_IMAGE_ID_LEN_MAX, NULL);
        dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
@@ -3352,8 +3381,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
        p = response;
        rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
                                                p + RBD_IMAGE_ID_LEN_MAX,
-                                               &rbd_dev->spec->image_id_len,
-                                               GFP_NOIO);
+                                               NULL, GFP_NOIO);
        if (IS_ERR(rbd_dev->spec->image_id)) {
                ret = PTR_ERR(rbd_dev->spec->image_id);
                rbd_dev->spec->image_id = NULL;
@@ -3377,11 +3405,10 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
        rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
        if (!rbd_dev->spec->image_id)
                return -ENOMEM;
-       rbd_dev->spec->image_id_len = 0;
 
        /* Record the header object name for this rbd image. */
 
-       size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
+       size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
        rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
        if (!rbd_dev->header_name) {
                ret = -ENOMEM;
@@ -3427,7 +3454,7 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
         * Image id was filled in by the caller.  Record the header
         * object name for this rbd image.
         */
-       size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
+       size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
        rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
        if (!rbd_dev->header_name)
                return -ENOMEM;
@@ -3542,7 +3569,7 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
        if (ret)
                goto err_out_bus;
 
-       ret = rbd_init_watch_dev(rbd_dev);
+       ret = rbd_req_sync_watch(rbd_dev, 1);
        if (ret)
                goto err_out_bus;
 
@@ -3638,6 +3665,13 @@ static ssize_t rbd_add(struct bus_type *bus,
                goto err_out_client;
        spec->pool_id = (u64) rc;
 
+       /* The ceph file layout needs to fit pool id in 32 bits */
+
+       if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
+               rc = -EIO;
+               goto err_out_client;
+       }
+
        rbd_dev = rbd_dev_create(rbdc, spec);
        if (!rbd_dev)
                goto err_out_client;
@@ -3698,8 +3732,7 @@ static void rbd_dev_release(struct device *dev)
                                                    rbd_dev->watch_request);
        }
        if (rbd_dev->watch_event)
-               rbd_req_sync_unwatch(rbd_dev);
-
+               rbd_req_sync_watch(rbd_dev, 0);
 
        /* clean up and free blkdev */
        rbd_free_disk(rbd_dev);
index a1d9bb30c1bf9fc5460286ff465e2bcbc338bf30..1e1e02055a2b5370b375bd29b9138cf6798b81a6 100644 (file)
@@ -611,8 +611,16 @@ retry:
 
        if (flags & CEPH_CAP_FLAG_AUTH)
                ci->i_auth_cap = cap;
-       else if (ci->i_auth_cap == cap)
+       else if (ci->i_auth_cap == cap) {
                ci->i_auth_cap = NULL;
+               spin_lock(&mdsc->cap_dirty_lock);
+               if (!list_empty(&ci->i_dirty_item)) {
+                       dout(" moving %p to cap_dirty_migrating\n", inode);
+                       list_move(&ci->i_dirty_item,
+                                 &mdsc->cap_dirty_migrating);
+               }
+               spin_unlock(&mdsc->cap_dirty_lock);
+       }
 
        dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n",
             inode, ceph_vinop(inode), cap, ceph_cap_string(issued),
@@ -1460,7 +1468,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
        struct ceph_mds_client *mdsc = fsc->mdsc;
        struct inode *inode = &ci->vfs_inode;
        struct ceph_cap *cap;
-       int file_wanted, used;
+       int file_wanted, used, cap_used;
        int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
        int issued, implemented, want, retain, revoking, flushing = 0;
        int mds = -1;   /* keep track of how far we've gone through i_caps list
@@ -1563,9 +1571,14 @@ retry_locked:
 
                /* NOTE: no side-effects allowed, until we take s_mutex */
 
+               cap_used = used;
+               if (ci->i_auth_cap && cap != ci->i_auth_cap)
+                       cap_used &= ~ci->i_auth_cap->issued;
+
                revoking = cap->implemented & ~cap->issued;
-               dout(" mds%d cap %p issued %s implemented %s revoking %s\n",
+               dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n",
                     cap->mds, cap, ceph_cap_string(cap->issued),
+                    ceph_cap_string(cap_used),
                     ceph_cap_string(cap->implemented),
                     ceph_cap_string(revoking));
 
@@ -1593,7 +1606,7 @@ retry_locked:
                }
 
                /* completed revocation? going down and there are no caps? */
-               if (revoking && (revoking & used) == 0) {
+               if (revoking && (revoking & cap_used) == 0) {
                        dout("completed revocation of %s\n",
                             ceph_cap_string(cap->implemented & ~cap->issued));
                        goto ack;
@@ -1670,8 +1683,8 @@ ack:
                sent++;
 
                /* __send_cap drops i_ceph_lock */
-               delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, used, want,
-                                     retain, flushing, NULL);
+               delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used,
+                                     want, retain, flushing, NULL);
                goto retry; /* retake i_ceph_lock and restart our cap scan. */
        }
 
@@ -2416,7 +2429,9 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
                dout("mds wanted %s -> %s\n",
                     ceph_cap_string(le32_to_cpu(grant->wanted)),
                     ceph_cap_string(wanted));
-               grant->wanted = cpu_to_le32(wanted);
+               /* imported cap may not have correct mds_wanted */
+               if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT)
+                       check_caps = 1;
        }
 
        cap->seq = seq;
@@ -2820,6 +2835,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
             (unsigned)seq);
 
+       if (op == CEPH_CAP_OP_IMPORT)
+               ceph_add_cap_releases(mdsc, session);
+
        /* lookup ino */
        inode = ceph_find_inode(sb, vino);
        ci = ceph_inode(inode);
index e51558fca3a346a93797561e26e148de037d971a..a1e5b81e8118331b8bd5a1f9b9f72e5ad4064ca0 100644 (file)
@@ -243,6 +243,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
        err = ceph_mdsc_do_request(mdsc,
                                   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
                                   req);
+       if (err)
+               goto out_err;
+
        err = ceph_handle_snapdir(req, dentry, err);
        if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
                err = ceph_handle_notrace_create(dir, dentry);
@@ -263,6 +266,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
                err = finish_no_open(file, dn);
        } else {
                dout("atomic_open finish_open on dn %p\n", dn);
+               if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
+                       *opened |= FILE_CREATED;
+               }
                err = finish_open(file, dentry, ceph_open, opened);
        }
 
index 36549a46e311cc7f7b717634e5723dbeddb58f5f..3b22150d3e1949dd97d5d3ea1eacfa50e11b9eec 100644 (file)
@@ -194,7 +194,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
                return -EFAULT;
 
        down_read(&osdc->map_sem);
-       r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, &len,
+       r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len,
                                          &dl.object_no, &dl.object_offset,
                                          &olen);
        if (r < 0)
index 9165eb8309eba442efa237aaa313a43c92641607..d95842036c8b11b570f0c5a998087527d3b351e3 100644 (file)
@@ -232,6 +232,30 @@ bad:
        return -EIO;
 }
 
+/*
+ * parse create results
+ */
+static int parse_reply_info_create(void **p, void *end,
+                                 struct ceph_mds_reply_info_parsed *info,
+                                 int features)
+{
+       if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
+               if (*p == end) {
+                       info->has_create_ino = false;
+               } else {
+                       info->has_create_ino = true;
+                       info->ino = ceph_decode_64(p);
+               }
+       }
+
+       if (unlikely(*p != end))
+               goto bad;
+       return 0;
+
+bad:
+       return -EIO;
+}
+
 /*
  * parse extra results
  */
@@ -241,8 +265,12 @@ static int parse_reply_info_extra(void **p, void *end,
 {
        if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
                return parse_reply_info_filelock(p, end, info, features);
-       else
+       else if (info->head->op == CEPH_MDS_OP_READDIR)
                return parse_reply_info_dir(p, end, info, features);
+       else if (info->head->op == CEPH_MDS_OP_CREATE)
+               return parse_reply_info_create(p, end, info, features);
+       else
+               return -EIO;
 }
 
 /*
@@ -2170,7 +2198,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        mutex_lock(&req->r_fill_mutex);
        err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
        if (err == 0) {
-               if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK &&
+               if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
+                                   req->r_op == CEPH_MDS_OP_LSSNAP) &&
                    rinfo->dir_nr)
                        ceph_readdir_prepopulate(req, req->r_session);
                ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
index dd26846dd71de4267146b7deb43c7a5d7b9b976f..567f7c60354e916fe5b34679d53313a361c2b08d 100644 (file)
@@ -74,6 +74,12 @@ struct ceph_mds_reply_info_parsed {
                        struct ceph_mds_reply_info_in *dir_in;
                        u8                            dir_complete, dir_end;
                };
+
+               /* for create results */
+               struct {
+                       bool has_create_ino;
+                       u64 ino;
+               };
        };
 
        /* encoded blob describing snapshot contexts for certain
index dad579b0c0e65981ea42c7a820f71e0480154b17..2160aab482f628d751de1fa682fc59f0a8ea9e11 100644 (file)
 #define CEPH_FEATURE_DIRLAYOUTHASH  (1<<7)
 /* bits 8-17 defined by user-space; not supported yet here */
 #define CEPH_FEATURE_CRUSH_TUNABLES (1<<18)
+/* bits 19-24 defined by user-space; not supported yet here */
+#define CEPH_FEATURE_CRUSH_TUNABLES2 (1<<25)
+/* bit 26 defined by user-space; not supported yet here */
+#define CEPH_FEATURE_REPLY_CREATE_INODE (1<<27)
 
 /*
  * Features supported.
  */
 #define CEPH_FEATURES_SUPPORTED_DEFAULT  \
        (CEPH_FEATURE_NOSRCADDR |        \
-        CEPH_FEATURE_CRUSH_TUNABLES)
+        CEPH_FEATURE_CRUSH_TUNABLES |    \
+        CEPH_FEATURE_CRUSH_TUNABLES2 |   \
+        CEPH_FEATURE_REPLY_CREATE_INODE)
 
 #define CEPH_FEATURES_REQUIRED_DEFAULT   \
        (CEPH_FEATURE_NOSRCADDR)
index 63d092822bad0b07064b869244ed394dbad1b818..360d9d08ca9e2a12a56e85b5625a938d8a2f6b3a 100644 (file)
@@ -52,10 +52,10 @@ static inline int ceph_has_room(void **p, void *end, size_t n)
        return end >= *p && n <= end - *p;
 }
 
-#define ceph_decode_need(p, end, n, bad)               \
-       do {                                            \
-               if (!likely(ceph_has_room(p, end, n)))  \
-                       goto bad;                       \
+#define ceph_decode_need(p, end, n, bad)                       \
+       do {                                                    \
+               if (!likely(ceph_has_room(p, end, n)))          \
+                       goto bad;                               \
        } while (0)
 
 #define ceph_decode_64_safe(p, end, v, bad)                    \
@@ -99,8 +99,8 @@ static inline int ceph_has_room(void **p, void *end, size_t n)
  *
  * There are two possible failures:
  *   - converting the string would require accessing memory at or
- *     beyond the "end" pointer provided (-E
- *   - memory could not be allocated for the result
+ *     beyond the "end" pointer provided (-ERANGE)
+ *   - memory could not be allocated for the result (-ENOMEM)
  */
 static inline char *ceph_extract_encoded_string(void **p, void *end,
                                                size_t *lenp, gfp_t gfp)
@@ -217,10 +217,10 @@ static inline void ceph_encode_string(void **p, void *end,
        *p += len;
 }
 
-#define ceph_encode_need(p, end, n, bad)               \
-       do {                                            \
-               if (!likely(ceph_has_room(p, end, n)))  \
-                       goto bad;                       \
+#define ceph_encode_need(p, end, n, bad)                       \
+       do {                                                    \
+               if (!likely(ceph_has_room(p, end, n)))          \
+                       goto bad;                               \
        } while (0)
 
 #define ceph_encode_64_safe(p, end, v, bad)                    \
@@ -231,12 +231,17 @@ static inline void ceph_encode_string(void **p, void *end,
 #define ceph_encode_32_safe(p, end, v, bad)                    \
        do {                                                    \
                ceph_encode_need(p, end, sizeof(u32), bad);     \
-               ceph_encode_32(p, v);                   \
+               ceph_encode_32(p, v);                           \
        } while (0)
 #define ceph_encode_16_safe(p, end, v, bad)                    \
        do {                                                    \
                ceph_encode_need(p, end, sizeof(u16), bad);     \
-               ceph_encode_16(p, v);                   \
+               ceph_encode_16(p, v);                           \
+       } while (0)
+#define ceph_encode_8_safe(p, end, v, bad)                     \
+       do {                                                    \
+               ceph_encode_need(p, end, sizeof(u8), bad);      \
+               ceph_encode_8(p, v);                            \
        } while (0)
 
 #define ceph_encode_copy_safe(p, end, pv, n, bad)              \
index d9b880e977e62fa5dcf853baf8dbffedf8c38cda..69287ccfe68a5a5be24a87f3e788620ab0997d20 100644 (file)
@@ -10,6 +10,7 @@
 #include <linux/ceph/osdmap.h>
 #include <linux/ceph/messenger.h>
 #include <linux/ceph/auth.h>
+#include <linux/ceph/pagelist.h>
 
 /* 
  * Maximum object name size 
@@ -22,7 +23,6 @@ struct ceph_snap_context;
 struct ceph_osd_request;
 struct ceph_osd_client;
 struct ceph_authorizer;
-struct ceph_pagelist;
 
 /*
  * completion callback for async writepages
@@ -95,7 +95,7 @@ struct ceph_osd_request {
        struct bio       *r_bio;              /* instead of pages */
 #endif
 
-       struct ceph_pagelist *r_trail;        /* trailing part of the data */
+       struct ceph_pagelist r_trail;         /* trailing part of the data */
 };
 
 struct ceph_osd_event {
@@ -157,7 +157,6 @@ struct ceph_osd_client {
 
 struct ceph_osd_req_op {
        u16 op;           /* CEPH_OSD_OP_* */
-       u32 flags;        /* CEPH_OSD_FLAG_* */
        union {
                struct {
                        u64 offset, length;
@@ -207,29 +206,24 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
 extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
                                 struct ceph_msg *msg);
 
-extern int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
-                       struct ceph_file_layout *layout,
-                       u64 snapid,
+extern int ceph_calc_raw_layout(struct ceph_file_layout *layout,
                        u64 off, u64 *plen, u64 *bno,
                        struct ceph_osd_request *req,
                        struct ceph_osd_req_op *op);
 
 extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
-                                              int flags,
                                               struct ceph_snap_context *snapc,
-                                              struct ceph_osd_req_op *ops,
+                                              unsigned int num_op,
                                               bool use_mempool,
-                                              gfp_t gfp_flags,
-                                              struct page **pages,
-                                              struct bio *bio);
+                                              gfp_t gfp_flags);
 
 extern void ceph_osdc_build_request(struct ceph_osd_request *req,
-                                   u64 off, u64 *plen,
+                                   u64 off, u64 len,
+                                   unsigned int num_op,
                                    struct ceph_osd_req_op *src_ops,
                                    struct ceph_snap_context *snapc,
-                                   struct timespec *mtime,
-                                   const char *oid,
-                                   int oid_len);
+                                   u64 snap_id,
+                                   struct timespec *mtime);
 
 extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
                                      struct ceph_file_layout *layout,
index 10a417f9f76fa9ccd2e2fffbc27950dbc355df59..c83a838f89f5ad8ceb1cb131e8840369fd72d1dd 100644 (file)
@@ -110,7 +110,7 @@ extern void ceph_osdmap_destroy(struct ceph_osdmap *map);
 
 /* calculate mapping of a file extent to an object */
 extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
-                                        u64 off, u64 *plen,
+                                        u64 off, u64 len,
                                         u64 *bno, u64 *oxoff, u64 *oxlen);
 
 /* calculate mapping of object to a placement group */
index 25baa287cff7287d8e6cc6f5b94255ef4cf86f9e..6a1101f24cfba84eaf6210f96802984009462dbd 100644 (file)
@@ -162,6 +162,8 @@ struct crush_map {
        __u32 choose_local_fallback_tries;
        /* choose attempts before giving up */ 
        __u32 choose_total_tries;
+       /* attempt chooseleaf inner descent once; on failure retry outer descent */
+       __u32 chooseleaf_descend_once;
 };
 
 
index 35fce755ce103528071d422ca6e9ca2e0faf6a27..cbd06a91941c15f3e88b9dd6671694e2fe76dbe2 100644 (file)
@@ -287,6 +287,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in
  * @outpos: our position in that vector
  * @firstn: true if choosing "first n" items, false if choosing "indep"
  * @recurse_to_leaf: true if we want one device under each item of given type
+ * @descend_once: true if we should only try one descent before giving up
  * @out2: second output vector for leaf items (if @recurse_to_leaf)
  */
 static int crush_choose(const struct crush_map *map,
@@ -295,7 +296,7 @@ static int crush_choose(const struct crush_map *map,
                        int x, int numrep, int type,
                        int *out, int outpos,
                        int firstn, int recurse_to_leaf,
-                       int *out2)
+                       int descend_once, int *out2)
 {
        int rep;
        unsigned int ftotal, flocal;
@@ -391,7 +392,7 @@ static int crush_choose(const struct crush_map *map,
                                }
 
                                reject = 0;
-                               if (recurse_to_leaf) {
+                               if (!collide && recurse_to_leaf) {
                                        if (item < 0) {
                                                if (crush_choose(map,
                                                         map->buckets[-1-item],
@@ -399,6 +400,7 @@ static int crush_choose(const struct crush_map *map,
                                                         x, outpos+1, 0,
                                                         out2, outpos,
                                                         firstn, 0,
+                                                        map->chooseleaf_descend_once,
                                                         NULL) <= outpos)
                                                        /* didn't get leaf */
                                                        reject = 1;
@@ -422,7 +424,10 @@ reject:
                                        ftotal++;
                                        flocal++;
 
-                                       if (collide && flocal <= map->choose_local_tries)
+                                       if (reject && descend_once)
+                                               /* let outer call try again */
+                                               skip_rep = 1;
+                                       else if (collide && flocal <= map->choose_local_tries)
                                                /* retry locally a few times */
                                                retry_bucket = 1;
                                        else if (map->choose_local_fallback_tries > 0 &&
@@ -485,6 +490,7 @@ int crush_do_rule(const struct crush_map *map,
        int i, j;
        int numrep;
        int firstn;
+       const int descend_once = 0;
 
        if ((__u32)ruleno >= map->max_rules) {
                dprintk(" bad ruleno %d\n", ruleno);
@@ -544,7 +550,8 @@ int crush_do_rule(const struct crush_map *map,
                                                      curstep->arg2,
                                                      o+osize, j,
                                                      firstn,
-                                                     recurse_to_leaf, c+osize);
+                                                     recurse_to_leaf,
+                                                     descend_once, c+osize);
                        }
 
                        if (recurse_to_leaf)
index eb9a4447876481e9a4110a1e68ea351ce3ec86d9..500ae8b493216a43251822eb728459c10a417093 100644 (file)
@@ -32,52 +32,43 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
 static void __send_request(struct ceph_osd_client *osdc,
                           struct ceph_osd_request *req);
 
-static int op_needs_trail(int op)
-{
-       switch (op) {
-       case CEPH_OSD_OP_GETXATTR:
-       case CEPH_OSD_OP_SETXATTR:
-       case CEPH_OSD_OP_CMPXATTR:
-       case CEPH_OSD_OP_CALL:
-       case CEPH_OSD_OP_NOTIFY:
-               return 1;
-       default:
-               return 0;
-       }
-}
-
 static int op_has_extent(int op)
 {
        return (op == CEPH_OSD_OP_READ ||
                op == CEPH_OSD_OP_WRITE);
 }
 
-int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
-                       struct ceph_file_layout *layout,
-                       u64 snapid,
+int ceph_calc_raw_layout(struct ceph_file_layout *layout,
                        u64 off, u64 *plen, u64 *bno,
                        struct ceph_osd_request *req,
                        struct ceph_osd_req_op *op)
 {
-       struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
        u64 orig_len = *plen;
        u64 objoff, objlen;    /* extent in object */
        int r;
 
-       reqhead->snapid = cpu_to_le64(snapid);
-
        /* object extent? */
-       r = ceph_calc_file_object_mapping(layout, off, plen, bno,
+       r = ceph_calc_file_object_mapping(layout, off, orig_len, bno,
                                          &objoff, &objlen);
        if (r < 0)
                return r;
-       if (*plen < orig_len)
+       if (objlen < orig_len) {
+               *plen = objlen;
                dout(" skipping last %llu, final file extent %llu~%llu\n",
                     orig_len - *plen, off, *plen);
+       }
 
        if (op_has_extent(op->op)) {
+               u32 osize = le32_to_cpu(layout->fl_object_size);
                op->extent.offset = objoff;
                op->extent.length = objlen;
+               if (op->extent.truncate_size <= off - objoff) {
+                       op->extent.truncate_size = 0;
+               } else {
+                       op->extent.truncate_size -= off - objoff;
+                       if (op->extent.truncate_size > osize)
+                               op->extent.truncate_size = osize;
+               }
        }
        req->r_num_pages = calc_pages_for(off, *plen);
        req->r_page_alignment = off & ~PAGE_MASK;
@@ -115,8 +106,7 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
  *
  * fill osd op in request message.
  */
-static int calc_layout(struct ceph_osd_client *osdc,
-                      struct ceph_vino vino,
+static int calc_layout(struct ceph_vino vino,
                       struct ceph_file_layout *layout,
                       u64 off, u64 *plen,
                       struct ceph_osd_request *req,
@@ -125,8 +115,7 @@ static int calc_layout(struct ceph_osd_client *osdc,
        u64 bno;
        int r;
 
-       r = ceph_calc_raw_layout(osdc, layout, vino.snap, off,
-                                plen, &bno, req, op);
+       r = ceph_calc_raw_layout(layout, off, plen, &bno, req, op);
        if (r < 0)
                return r;
 
@@ -163,10 +152,7 @@ void ceph_osdc_release_request(struct kref *kref)
                bio_put(req->r_bio);
 #endif
        ceph_put_snap_context(req->r_snapc);
-       if (req->r_trail) {
-               ceph_pagelist_release(req->r_trail);
-               kfree(req->r_trail);
-       }
+       ceph_pagelist_release(&req->r_trail);
        if (req->r_mempool)
                mempool_free(req, req->r_osdc->req_mempool);
        else
@@ -174,34 +160,14 @@ void ceph_osdc_release_request(struct kref *kref)
 }
 EXPORT_SYMBOL(ceph_osdc_release_request);
 
-static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
-{
-       int i = 0;
-
-       if (needs_trail)
-               *needs_trail = 0;
-       while (ops[i].op) {
-               if (needs_trail && op_needs_trail(ops[i].op))
-                       *needs_trail = 1;
-               i++;
-       }
-
-       return i;
-}
-
 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
-                                              int flags,
                                               struct ceph_snap_context *snapc,
-                                              struct ceph_osd_req_op *ops,
+                                              unsigned int num_op,
                                               bool use_mempool,
-                                              gfp_t gfp_flags,
-                                              struct page **pages,
-                                              struct bio *bio)
+                                              gfp_t gfp_flags)
 {
        struct ceph_osd_request *req;
        struct ceph_msg *msg;
-       int needs_trail;
-       int num_op = get_num_ops(ops, &needs_trail);
        size_t msg_size = sizeof(struct ceph_osd_request_head);
 
        msg_size += num_op*sizeof(struct ceph_osd_op);
@@ -228,10 +194,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        INIT_LIST_HEAD(&req->r_req_lru_item);
        INIT_LIST_HEAD(&req->r_osd_item);
 
-       req->r_flags = flags;
-
-       WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
-
        /* create reply message */
        if (use_mempool)
                msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -244,15 +206,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        }
        req->r_reply = msg;
 
-       /* allocate space for the trailing data */
-       if (needs_trail) {
-               req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
-               if (!req->r_trail) {
-                       ceph_osdc_put_request(req);
-                       return NULL;
-               }
-               ceph_pagelist_init(req->r_trail);
-       }
+       ceph_pagelist_init(&req->r_trail);
 
        /* create request message; allow space for oid */
        msg_size += MAX_OBJ_NAME_SIZE;
@@ -270,13 +224,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        memset(msg->front.iov_base, 0, msg->front.iov_len);
 
        req->r_request = msg;
-       req->r_pages = pages;
-#ifdef CONFIG_BLOCK
-       if (bio) {
-               req->r_bio = bio;
-               bio_get(req->r_bio);
-       }
-#endif
 
        return req;
 }
@@ -304,29 +251,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
        case CEPH_OSD_OP_GETXATTR:
        case CEPH_OSD_OP_SETXATTR:
        case CEPH_OSD_OP_CMPXATTR:
-               BUG_ON(!req->r_trail);
-
                dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
                dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
                dst->xattr.cmp_op = src->xattr.cmp_op;
                dst->xattr.cmp_mode = src->xattr.cmp_mode;
-               ceph_pagelist_append(req->r_trail, src->xattr.name,
+               ceph_pagelist_append(&req->r_trail, src->xattr.name,
                                     src->xattr.name_len);
-               ceph_pagelist_append(req->r_trail, src->xattr.val,
+               ceph_pagelist_append(&req->r_trail, src->xattr.val,
                                     src->xattr.value_len);
                break;
        case CEPH_OSD_OP_CALL:
-               BUG_ON(!req->r_trail);
-
                dst->cls.class_len = src->cls.class_len;
                dst->cls.method_len = src->cls.method_len;
                dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
 
-               ceph_pagelist_append(req->r_trail, src->cls.class_name,
+               ceph_pagelist_append(&req->r_trail, src->cls.class_name,
                                     src->cls.class_len);
-               ceph_pagelist_append(req->r_trail, src->cls.method_name,
+               ceph_pagelist_append(&req->r_trail, src->cls.method_name,
                                     src->cls.method_len);
-               ceph_pagelist_append(req->r_trail, src->cls.indata,
+               ceph_pagelist_append(&req->r_trail, src->cls.indata,
                                     src->cls.indata_len);
                break;
        case CEPH_OSD_OP_ROLLBACK:
@@ -339,11 +282,9 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
                        __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
                        __le32 timeout = cpu_to_le32(src->watch.timeout);
 
-                       BUG_ON(!req->r_trail);
-
-                       ceph_pagelist_append(req->r_trail,
+                       ceph_pagelist_append(&req->r_trail,
                                                &prot_ver, sizeof(prot_ver));
-                       ceph_pagelist_append(req->r_trail,
+                       ceph_pagelist_append(&req->r_trail,
                                                &timeout, sizeof(timeout));
                }
        case CEPH_OSD_OP_NOTIFY_ACK:
@@ -365,25 +306,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
  *
  */
 void ceph_osdc_build_request(struct ceph_osd_request *req,
-                            u64 off, u64 *plen,
+                            u64 off, u64 len, unsigned int num_op,
                             struct ceph_osd_req_op *src_ops,
-                            struct ceph_snap_context *snapc,
-                            struct timespec *mtime,
-                            const char *oid,
-                            int oid_len)
+                            struct ceph_snap_context *snapc, u64 snap_id,
+                            struct timespec *mtime)
 {
        struct ceph_msg *msg = req->r_request;
        struct ceph_osd_request_head *head;
        struct ceph_osd_req_op *src_op;
        struct ceph_osd_op *op;
        void *p;
-       int num_op = get_num_ops(src_ops, NULL);
        size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
        int flags = req->r_flags;
        u64 data_len = 0;
        int i;
 
+       WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
+
        head = msg->front.iov_base;
+       head->snapid = cpu_to_le64(snap_id);
        op = (void *)(head + 1);
        p = (void *)(op + num_op);
 
@@ -393,23 +334,19 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
        head->flags = cpu_to_le32(flags);
        if (flags & CEPH_OSD_FLAG_WRITE)
                ceph_encode_timespec(&head->mtime, mtime);
+       BUG_ON(num_op > (unsigned int) ((u16) -1));
        head->num_ops = cpu_to_le16(num_op);
 
-
        /* fill in oid */
-       head->object_len = cpu_to_le32(oid_len);
-       memcpy(p, oid, oid_len);
-       p += oid_len;
+       head->object_len = cpu_to_le32(req->r_oid_len);
+       memcpy(p, req->r_oid, req->r_oid_len);
+       p += req->r_oid_len;
 
        src_op = src_ops;
-       while (src_op->op) {
-               osd_req_encode_op(req, op, src_op);
-               src_op++;
-               op++;
-       }
+       while (num_op--)
+               osd_req_encode_op(req, op++, src_op++);
 
-       if (req->r_trail)
-               data_len += req->r_trail->length;
+       data_len += req->r_trail.length;
 
        if (snapc) {
                head->snap_seq = cpu_to_le64(snapc->seq);
@@ -422,7 +359,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
 
        if (flags & CEPH_OSD_FLAG_WRITE) {
                req->r_request->hdr.data_off = cpu_to_le16(off);
-               req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
+               req->r_request->hdr.data_len = cpu_to_le32(len + data_len);
        } else if (data_len) {
                req->r_request->hdr.data_off = 0;
                req->r_request->hdr.data_len = cpu_to_le32(data_len);
@@ -462,31 +399,30 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                                               bool use_mempool, int num_reply,
                                               int page_align)
 {
-       struct ceph_osd_req_op ops[3];
+       struct ceph_osd_req_op ops[2];
        struct ceph_osd_request *req;
+       unsigned int num_op = 1;
        int r;
 
+       memset(&ops, 0, sizeof ops);
+
        ops[0].op = opcode;
        ops[0].extent.truncate_seq = truncate_seq;
        ops[0].extent.truncate_size = truncate_size;
-       ops[0].payload_len = 0;
 
        if (do_sync) {
                ops[1].op = CEPH_OSD_OP_STARTSYNC;
-               ops[1].payload_len = 0;
-               ops[2].op = 0;
-       } else
-               ops[1].op = 0;
-
-       req = ceph_osdc_alloc_request(osdc, flags,
-                                        snapc, ops,
-                                        use_mempool,
-                                        GFP_NOFS, NULL, NULL);
+               num_op++;
+       }
+
+       req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
+                                       GFP_NOFS);
        if (!req)
                return ERR_PTR(-ENOMEM);
+       req->r_flags = flags;
 
        /* calculate max write size */
-       r = calc_layout(osdc, vino, layout, off, plen, req, ops);
+       r = calc_layout(vino, layout, off, plen, req, ops);
        if (r < 0)
                return ERR_PTR(r);
        req->r_file_layout = *layout;  /* keep a copy */
@@ -496,10 +432,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        req->r_num_pages = calc_pages_for(page_align, *plen);
        req->r_page_alignment = page_align;
 
-       ceph_osdc_build_request(req, off, plen, ops,
-                               snapc,
-                               mtime,
-                               req->r_oid, req->r_oid_len);
+       ceph_osdc_build_request(req, off, *plen, num_op, ops,
+                               snapc, vino.snap, mtime);
 
        return req;
 }
@@ -739,31 +673,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
  */
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
-       struct ceph_osd_request *req;
-       int ret = 0;
+       struct ceph_entity_addr *peer_addr;
 
        dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
        if (list_empty(&osd->o_requests) &&
            list_empty(&osd->o_linger_requests)) {
                __remove_osd(osdc, osd);
-               ret = -ENODEV;
-       } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
-                         &osd->o_con.peer_addr,
-                         sizeof(osd->o_con.peer_addr)) == 0 &&
-                  !ceph_con_opened(&osd->o_con)) {
+
+               return -ENODEV;
+       }
+
+       peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
+       if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
+                       !ceph_con_opened(&osd->o_con)) {
+               struct ceph_osd_request *req;
+
                dout(" osd addr hasn't changed and connection never opened,"
                     " letting msgr retry");
                /* touch each r_stamp for handle_timeout()'s benfit */
                list_for_each_entry(req, &osd->o_requests, r_osd_item)
                        req->r_stamp = jiffies;
-               ret = -EAGAIN;
-       } else {
-               ceph_con_close(&osd->o_con);
-               ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
-                             &osdc->osdmap->osd_addr[osd->o_osd]);
-               osd->o_incarnation++;
+
+               return -EAGAIN;
        }
-       return ret;
+
+       ceph_con_close(&osd->o_con);
+       ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
+       osd->o_incarnation++;
+
+       return 0;
 }
 
 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
@@ -1706,7 +1644,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
 #ifdef CONFIG_BLOCK
        req->r_request->bio = req->r_bio;
 #endif
-       req->r_request->trail = req->r_trail;
+       req->r_request->trail = &req->r_trail;
 
        register_request(osdc, req);
 
index de73214b5d26c04989905bafc62bd9c056f2131c..3c61e21611d3c5a5e666c9bf6c46126ea95f927b 100644 (file)
 
 char *ceph_osdmap_state_str(char *str, int len, int state)
 {
-       int flag = 0;
-
        if (!len)
-               goto done;
-
-       *str = '\0';
-       if (state) {
-               if (state & CEPH_OSD_EXISTS) {
-                       snprintf(str, len, "exists");
-                       flag = 1;
-               }
-               if (state & CEPH_OSD_UP) {
-                       snprintf(str, len, "%s%s%s", str, (flag ? ", " : ""),
-                                "up");
-                       flag = 1;
-               }
-       } else {
+               return str;
+
+       if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP))
+               snprintf(str, len, "exists, up");
+       else if (state & CEPH_OSD_EXISTS)
+               snprintf(str, len, "exists");
+       else if (state & CEPH_OSD_UP)
+               snprintf(str, len, "up");
+       else
                snprintf(str, len, "doesn't exist");
-       }
-done:
+
        return str;
 }
 
@@ -170,6 +162,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
         c->choose_local_tries = 2;
         c->choose_local_fallback_tries = 5;
         c->choose_total_tries = 19;
+       c->chooseleaf_descend_once = 0;
 
        ceph_decode_need(p, end, 4*sizeof(u32), bad);
        magic = ceph_decode_32(p);
@@ -336,6 +329,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
         dout("crush decode tunable choose_total_tries = %d",
              c->choose_total_tries);
 
+       ceph_decode_need(p, end, sizeof(u32), done);
+       c->chooseleaf_descend_once = ceph_decode_32(p);
+       dout("crush decode tunable chooseleaf_descend_once = %d",
+            c->chooseleaf_descend_once);
+
 done:
        dout("crush_decode success\n");
        return c;
@@ -1010,7 +1008,7 @@ bad:
  * pass a stride back to the caller.
  */
 int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
-                                  u64 off, u64 *plen,
+                                  u64 off, u64 len,
                                   u64 *ono,
                                   u64 *oxoff, u64 *oxlen)
 {
@@ -1021,7 +1019,7 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
        u32 su_per_object;
        u64 t, su_offset;
 
-       dout("mapping %llu~%llu  osize %u fl_su %u\n", off, *plen,
+       dout("mapping %llu~%llu  osize %u fl_su %u\n", off, len,
             osize, su);
        if (su == 0 || sc == 0)
                goto invalid;
@@ -1054,11 +1052,10 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
 
        /*
         * Calculate the length of the extent being written to the selected
-        * object. This is the minimum of the full length requested (plen) or
+        * object. This is the minimum of the full length requested (len) or
         * the remainder of the current stripe being written to.
         */
-       *oxlen = min_t(u64, *plen, su - su_offset);
-       *plen = *oxlen;
+       *oxlen = min_t(u64, len, su - su_offset);
 
        dout(" obj extent %llu~%llu\n", *oxoff, *oxlen);
        return 0;