rbd: do not read in parent info before snap context
[firefly-linux-kernel-4.4.55.git] / drivers / block / rbd.c
index 4c95b503b09ee3593ecfe1e2ca035788295b9f80..c4606987e9d1368b435ccc1aa2e2fd48843f272a 100644 (file)
@@ -514,7 +514,8 @@ static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
 
 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
-static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
+static int rbd_dev_header_info(struct rbd_device *rbd_dev);
+static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
                                        u64 snap_id);
 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
@@ -541,7 +542,6 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
                return -ENOENT;
 
        (void) get_device(&rbd_dev->dev);
-       set_device_ro(bdev, rbd_dev->mapping.read_only);
 
        return 0;
 }
@@ -559,10 +559,76 @@ static void rbd_release(struct gendisk *disk, fmode_t mode)
        put_device(&rbd_dev->dev);
 }
 
+static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
+{
+       int ret = 0;
+       int val;
+       bool ro;
+       bool ro_changed = false;
+
+       /* get_user() may sleep, so call it before taking rbd_dev->lock */
+       if (get_user(val, (int __user *)(arg)))
+               return -EFAULT;
+
+       ro = val ? true : false;
+       /* Snapshot doesn't allow to write*/
+       if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
+               return -EROFS;
+
+       spin_lock_irq(&rbd_dev->lock);
+       /* prevent others open this device */
+       if (rbd_dev->open_count > 1) {
+               ret = -EBUSY;
+               goto out;
+       }
+
+       if (rbd_dev->mapping.read_only != ro) {
+               rbd_dev->mapping.read_only = ro;
+               ro_changed = true;
+       }
+
+out:
+       spin_unlock_irq(&rbd_dev->lock);
+       /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
+       if (ret == 0 && ro_changed)
+               set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
+
+       return ret;
+}
+
+static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
+                       unsigned int cmd, unsigned long arg)
+{
+       struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
+       int ret = 0;
+
+       switch (cmd) {
+       case BLKROSET:
+               ret = rbd_ioctl_set_ro(rbd_dev, arg);
+               break;
+       default:
+               ret = -ENOTTY;
+       }
+
+       return ret;
+}
+
+#ifdef CONFIG_COMPAT
+static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
+                               unsigned int cmd, unsigned long arg)
+{
+       return rbd_ioctl(bdev, mode, cmd, arg);
+}
+#endif /* CONFIG_COMPAT */
+
 static const struct block_device_operations rbd_bd_ops = {
        .owner                  = THIS_MODULE,
        .open                   = rbd_open,
        .release                = rbd_release,
+       .ioctl                  = rbd_ioctl,
+#ifdef CONFIG_COMPAT
+       .compat_ioctl           = rbd_compat_ioctl,
+#endif
 };
 
 /*
@@ -906,12 +972,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
        header->snap_names = snap_names;
        header->snap_sizes = snap_sizes;
 
-       /* Make sure mapping size is consistent with header info */
-
-       if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
-               if (rbd_dev->mapping.size != header->image_size)
-                       rbd_dev->mapping.size = header->image_size;
-
        return 0;
 out_2big:
        ret = -EIO;
@@ -1074,6 +1134,13 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
        rbd_dev->mapping.features = 0;
 }
 
+static void rbd_segment_name_free(const char *name)
+{
+       /* The explicit cast here is needed to drop the const qualifier */
+
+       kmem_cache_free(rbd_segment_name_cache, (void *)name);
+}
+
 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
 {
        char *name;
@@ -1093,20 +1160,13 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
        if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
                pr_err("error formatting segment name for #%llu (%d)\n",
                        segment, ret);
-               kfree(name);
+               rbd_segment_name_free(name);
                name = NULL;
        }
 
        return name;
 }
 
-static void rbd_segment_name_free(const char *name)
-{
-       /* The explicit cast here is needed to drop the const qualifier */
-
-       kmem_cache_free(rbd_segment_name_cache, (void *)name);
-}
-
 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
 {
        u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
@@ -1366,6 +1426,14 @@ static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
        return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
 }
 
+static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
+{
+       struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
+
+       return obj_request->img_offset <
+           round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
+}
+
 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
 {
        dout("%s: obj %p (was %d)\n", __func__, obj_request,
@@ -1382,6 +1450,13 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
        kref_put(&obj_request->kref, rbd_obj_request_destroy);
 }
 
+static void rbd_img_request_get(struct rbd_img_request *img_request)
+{
+       dout("%s: img %p (was %d)\n", __func__, img_request,
+            atomic_read(&img_request->kref.refcount));
+       kref_get(&img_request->kref);
+}
+
 static bool img_request_child_test(struct rbd_img_request *img_request);
 static void rbd_parent_request_destroy(struct kref *kref);
 static void rbd_img_request_destroy(struct kref *kref);
@@ -1447,11 +1522,37 @@ static bool obj_request_type_valid(enum obj_request_type type)
 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
                                struct rbd_obj_request *obj_request)
 {
-       dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
-
+       dout("%s %p\n", __func__, obj_request);
        return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
 }
 
+static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
+{
+       dout("%s %p\n", __func__, obj_request);
+       ceph_osdc_cancel_request(obj_request->osd_req);
+}
+
+/*
+ * Wait for an object request to complete.  If interrupted, cancel the
+ * underlying osd request.
+ */
+static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
+{
+       int ret;
+
+       dout("%s %p\n", __func__, obj_request);
+
+       ret = wait_for_completion_interruptible(&obj_request->completion);
+       if (ret < 0) {
+               dout("%s %p interrupted\n", __func__, obj_request);
+               rbd_obj_request_end(obj_request);
+               return ret;
+       }
+
+       dout("%s %p done\n", __func__, obj_request);
+       return 0;
+}
+
 static void rbd_img_request_complete(struct rbd_img_request *img_request)
 {
 
@@ -1478,15 +1579,6 @@ static void rbd_img_request_complete(struct rbd_img_request *img_request)
                rbd_img_request_put(img_request);
 }
 
-/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
-
-static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
-{
-       dout("%s: obj %p\n", __func__, obj_request);
-
-       return wait_for_completion_interruptible(&obj_request->completion);
-}
-
 /*
  * The default/initial value for all image request flags is 0.  Each
  * is conditionally set to 1 at image request initialization time
@@ -2142,6 +2234,7 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
        img_request->next_completion = which;
 out:
        spin_unlock_irq(&img_request->completion_lock);
+       rbd_img_request_put(img_request);
 
        if (!more)
                rbd_img_request_complete(img_request);
@@ -2242,6 +2335,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
                        goto out_unwind;
                obj_request->osd_req = osd_req;
                obj_request->callback = rbd_img_obj_callback;
+               rbd_img_request_get(img_request);
 
                if (write_request) {
                        osd_req_op_alloc_hint_init(osd_req, which,
@@ -2674,7 +2768,7 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
         */
        if (!img_request_write_test(img_request) ||
                !img_request_layered_test(img_request) ||
-               rbd_dev->parent_overlap <= obj_request->img_offset ||
+               !obj_request_overlaps_parent(obj_request) ||
                ((known = obj_request_known_test(obj_request)) &&
                        obj_request_exists_test(obj_request))) {
 
@@ -2864,107 +2958,135 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
        dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
                rbd_dev->header_name, (unsigned long long)notify_id,
                (unsigned int)opcode);
+
+       /*
+        * Until adequate refresh error handling is in place, there is
+        * not much we can do here, except warn.
+        *
+        * See http://tracker.ceph.com/issues/5040
+        */
        ret = rbd_dev_refresh(rbd_dev);
        if (ret)
-               rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
+               rbd_warn(rbd_dev, "refresh failed: %d\n", ret);
 
-       rbd_obj_notify_ack_sync(rbd_dev, notify_id);
+       ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
+       if (ret)
+               rbd_warn(rbd_dev, "notify_ack ret %d\n", ret);
 }
 
 /*
- * Request sync osd watch/unwatch.  The value of "start" determines
- * whether a watch request is being initiated or torn down.
+ * Send a (un)watch request and wait for the ack.  Return a request
+ * with a ref held on success or error.
  */
-static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
+static struct rbd_obj_request *rbd_obj_watch_request_helper(
+                                               struct rbd_device *rbd_dev,
+                                               bool watch)
 {
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct rbd_obj_request *obj_request;
        int ret;
 
-       rbd_assert(start ^ !!rbd_dev->watch_event);
-       rbd_assert(start ^ !!rbd_dev->watch_request);
-
-       if (start) {
-               ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
-                                               &rbd_dev->watch_event);
-               if (ret < 0)
-                       return ret;
-               rbd_assert(rbd_dev->watch_event != NULL);
-       }
-
-       ret = -ENOMEM;
        obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
-                                                       OBJ_REQUEST_NODATA);
+                                            OBJ_REQUEST_NODATA);
        if (!obj_request)
-               goto out_cancel;
+               return ERR_PTR(-ENOMEM);
 
        obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
                                                  obj_request);
-       if (!obj_request->osd_req)
-               goto out_cancel;
-
-       if (start)
-               ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
-       else
-               ceph_osdc_unregister_linger_request(osdc,
-                                       rbd_dev->watch_request->osd_req);
+       if (!obj_request->osd_req) {
+               ret = -ENOMEM;
+               goto out;
+       }
 
        osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-                               rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
+                             rbd_dev->watch_event->cookie, 0, watch);
        rbd_osd_req_format_write(obj_request);
 
+       if (watch)
+               ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
+
        ret = rbd_obj_request_submit(osdc, obj_request);
        if (ret)
-               goto out_cancel;
+               goto out;
+
        ret = rbd_obj_request_wait(obj_request);
        if (ret)
-               goto out_cancel;
+               goto out;
+
        ret = obj_request->result;
-       if (ret)
-               goto out_cancel;
+       if (ret) {
+               if (watch)
+                       rbd_obj_request_end(obj_request);
+               goto out;
+       }
+
+       return obj_request;
+
+out:
+       rbd_obj_request_put(obj_request);
+       return ERR_PTR(ret);
+}
+
+/*
+ * Initiate a watch request, synchronously.
+ */
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
+{
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       struct rbd_obj_request *obj_request;
+       int ret;
+
+       rbd_assert(!rbd_dev->watch_event);
+       rbd_assert(!rbd_dev->watch_request);
+
+       ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
+                                    &rbd_dev->watch_event);
+       if (ret < 0)
+               return ret;
+
+       obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
+       if (IS_ERR(obj_request)) {
+               ceph_osdc_cancel_event(rbd_dev->watch_event);
+               rbd_dev->watch_event = NULL;
+               return PTR_ERR(obj_request);
+       }
 
        /*
         * A watch request is set to linger, so the underlying osd
         * request won't go away until we unregister it.  We retain
         * a pointer to the object request during that time (in
-        * rbd_dev->watch_request), so we'll keep a reference to
-        * it.  We'll drop that reference (below) after we've
-        * unregistered it.
+        * rbd_dev->watch_request), so we'll keep a reference to it.
+        * We'll drop that reference after we've unregistered it in
+        * rbd_dev_header_unwatch_sync().
         */
-       if (start) {
-               rbd_dev->watch_request = obj_request;
+       rbd_dev->watch_request = obj_request;
 
-               return 0;
-       }
+       return 0;
+}
 
-       /* We have successfully torn down the watch request */
+/*
+ * Tear down a watch request, synchronously.
+ */
+static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+{
+       struct rbd_obj_request *obj_request;
+
+       rbd_assert(rbd_dev->watch_event);
+       rbd_assert(rbd_dev->watch_request);
 
+       rbd_obj_request_end(rbd_dev->watch_request);
        rbd_obj_request_put(rbd_dev->watch_request);
        rbd_dev->watch_request = NULL;
-out_cancel:
-       /* Cancel the event if we're tearing down, or on error */
-       ceph_osdc_cancel_event(rbd_dev->watch_event);
-       rbd_dev->watch_event = NULL;
-       if (obj_request)
-               rbd_obj_request_put(obj_request);
 
-       return ret;
-}
-
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
-{
-       return __rbd_dev_header_watch_sync(rbd_dev, true);
-}
-
-static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
-{
-       int ret;
+       obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
+       if (!IS_ERR(obj_request))
+               rbd_obj_request_put(obj_request);
+       else
+               rbd_warn(rbd_dev, "unable to tear down watch request (%ld)",
+                        PTR_ERR(obj_request));
 
-       ret = __rbd_dev_header_watch_sync(rbd_dev, false);
-       if (ret) {
-               rbd_warn(rbd_dev, "unable to tear down watch request: %d\n",
-                        ret);
-       }
+       ceph_osdc_cancel_event(rbd_dev->watch_event);
+       rbd_dev->watch_event = NULL;
 }
 
 /*
@@ -3058,7 +3180,6 @@ static void rbd_request_fn(struct request_queue *q)
                __releases(q->queue_lock) __acquires(q->queue_lock)
 {
        struct rbd_device *rbd_dev = q->queuedata;
-       bool read_only = rbd_dev->mapping.read_only;
        struct request *rq;
        int result;
 
@@ -3094,7 +3215,7 @@ static void rbd_request_fn(struct request_queue *q)
 
                if (write_request) {
                        result = -EROFS;
-                       if (read_only)
+                       if (rbd_dev->mapping.read_only)
                                goto end_request;
                        rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
                }
@@ -3389,24 +3510,37 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
        u64 mapping_size;
        int ret;
 
-       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
        down_write(&rbd_dev->header_rwsem);
        mapping_size = rbd_dev->mapping.size;
-       if (rbd_dev->image_format == 1)
-               ret = rbd_dev_v1_header_info(rbd_dev);
-       else
-               ret = rbd_dev_v2_header_info(rbd_dev);
 
-       /* If it's a mapped snapshot, validate its EXISTS flag */
+       ret = rbd_dev_header_info(rbd_dev);
+       if (ret)
+               return ret;
+
+       /*
+        * If there is a parent, see if it has disappeared due to the
+        * mapped image getting flattened.
+        */
+       if (rbd_dev->parent) {
+               ret = rbd_dev_v2_parent_info(rbd_dev);
+               if (ret)
+                       return ret;
+       }
+
+       if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
+               if (rbd_dev->mapping.size != rbd_dev->header.image_size)
+                       rbd_dev->mapping.size = rbd_dev->header.image_size;
+       } else {
+               /* validate mapped snapshot's EXISTS flag */
+               rbd_exists_validate(rbd_dev);
+       }
 
-       rbd_exists_validate(rbd_dev);
        up_write(&rbd_dev->header_rwsem);
 
-       if (mapping_size != rbd_dev->mapping.size) {
+       if (mapping_size != rbd_dev->mapping.size)
                rbd_dev_update_size(rbd_dev);
-       }
 
-       return ret;
+       return 0;
 }
 
 static int rbd_init_disk(struct rbd_device *rbd_dev)
@@ -3568,46 +3702,36 @@ static ssize_t rbd_snap_show(struct device *dev,
 }
 
 /*
- * For an rbd v2 image, shows the pool id, image id, and snapshot id
- * for the parent image.  If there is no parent, simply shows
- * "(no parent image)".
+ * For a v2 image, shows the chain of parent images, separated by empty
+ * lines.  For v1 images or if there is no parent, shows "(no parent
+ * image)".
  */
 static ssize_t rbd_parent_show(struct device *dev,
-                            struct device_attribute *attr,
-                            char *buf)
+                              struct device_attribute *attr,
+                              char *buf)
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
-       struct rbd_spec *spec = rbd_dev->parent_spec;
-       int count;
-       char *bufp = buf;
+       ssize_t count = 0;
 
-       if (!spec)
+       if (!rbd_dev->parent)
                return sprintf(buf, "(no parent image)\n");
 
-       count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
-                       (unsigned long long) spec->pool_id, spec->pool_name);
-       if (count < 0)
-               return count;
-       bufp += count;
-
-       count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
-                       spec->image_name ? spec->image_name : "(unknown)");
-       if (count < 0)
-               return count;
-       bufp += count;
-
-       count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
-                       (unsigned long long) spec->snap_id, spec->snap_name);
-       if (count < 0)
-               return count;
-       bufp += count;
-
-       count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
-       if (count < 0)
-               return count;
-       bufp += count;
+       for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
+               struct rbd_spec *spec = rbd_dev->parent_spec;
+
+               count += sprintf(&buf[count], "%s"
+                           "pool_id %llu\npool_name %s\n"
+                           "image_id %s\nimage_name %s\n"
+                           "snap_id %llu\nsnap_name %s\n"
+                           "overlap %llu\n",
+                           !count ? "" : "\n", /* first? */
+                           spec->pool_id, spec->pool_name,
+                           spec->image_id, spec->image_name ?: "(unknown)",
+                           spec->snap_id, spec->snap_name,
+                           rbd_dev->parent_overlap);
+       }
 
-       return (ssize_t) (bufp - buf);
+       return count;
 }
 
 static ssize_t rbd_image_refresh(struct device *dev,
@@ -3620,9 +3744,9 @@ static ssize_t rbd_image_refresh(struct device *dev,
 
        ret = rbd_dev_refresh(rbd_dev);
        if (ret)
-               rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
+               return ret;
 
-       return ret < 0 ? ret : size;
+       return size;
 }
 
 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
@@ -3694,6 +3818,9 @@ static struct rbd_spec *rbd_spec_alloc(void)
        spec = kzalloc(sizeof (*spec), GFP_KERNEL);
        if (!spec)
                return NULL;
+
+       spec->pool_id = CEPH_NOPOOL;
+       spec->snap_id = CEPH_NOSNAP;
        kref_init(&spec->kref);
 
        return spec;
@@ -3955,6 +4082,8 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
                parent_spec->snap_id = snap_id;
                rbd_dev->parent_spec = parent_spec;
                parent_spec = NULL;     /* rbd_dev now owns this */
+       } else {
+               kfree(image_id);
        }
 
        /*
@@ -4151,18 +4280,38 @@ static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
 }
 
 /*
- * When an rbd image has a parent image, it is identified by the
- * pool, image, and snapshot ids (not names).  This function fills
- * in the names for those ids.  (It's OK if we can't figure out the
- * name for an image id, but the pool and snapshot ids should always
- * exist and have names.)  All names in an rbd spec are dynamically
- * allocated.
+ * An image being mapped will have everything but the snap id.
+ */
+static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
+{
+       struct rbd_spec *spec = rbd_dev->spec;
+
+       rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
+       rbd_assert(spec->image_id && spec->image_name);
+       rbd_assert(spec->snap_name);
+
+       if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
+               u64 snap_id;
+
+               snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
+               if (snap_id == CEPH_NOSNAP)
+                       return -ENOENT;
+
+               spec->snap_id = snap_id;
+       } else {
+               spec->snap_id = CEPH_NOSNAP;
+       }
+
+       return 0;
+}
+
+/*
+ * A parent image will have all ids but none of the names.
  *
- * When an image being mapped (not a parent) is probed, we have the
- * pool name and pool id, image name and image id, and the snapshot
- * name.  The only thing we're missing is the snapshot id.
+ * All names in an rbd spec are dynamically allocated.  It's OK if we
+ * can't figure out the name for an image id.
  */
-static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
+static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
 {
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct rbd_spec *spec = rbd_dev->spec;
@@ -4171,24 +4320,9 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
        const char *snap_name;
        int ret;
 
-       /*
-        * An image being mapped will have the pool name (etc.), but
-        * we need to look up the snapshot id.
-        */
-       if (spec->pool_name) {
-               if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
-                       u64 snap_id;
-
-                       snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
-                       if (snap_id == CEPH_NOSNAP)
-                               return -ENOENT;
-                       spec->snap_id = snap_id;
-               } else {
-                       spec->snap_id = CEPH_NOSNAP;
-               }
-
-               return 0;
-       }
+       rbd_assert(spec->pool_id != CEPH_NOPOOL);
+       rbd_assert(spec->image_id);
+       rbd_assert(spec->snap_id != CEPH_NOSNAP);
 
        /* Get the pool name; we have to make our own copy of this */
 
@@ -4207,7 +4341,7 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
        if (!image_name)
                rbd_warn(rbd_dev, "unable to get image name");
 
-       /* Look up the snapshot name, and make a copy */
+       /* Fetch the snapshot name */
 
        snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
        if (IS_ERR(snap_name)) {
@@ -4220,10 +4354,10 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
        spec->snap_name = snap_name;
 
        return 0;
+
 out_err:
        kfree(image_name);
        kfree(pool_name);
-
        return ret;
 }
 
@@ -4355,43 +4489,22 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
                        return ret;
        }
 
-       /*
-        * If the image supports layering, get the parent info.  We
-        * need to probe the first time regardless.  Thereafter we
-        * only need to if there's a parent, to see if it has
-        * disappeared due to the mapped image getting flattened.
-        */
-       if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
-                       (first_time || rbd_dev->parent_spec)) {
-               bool warn;
-
-               ret = rbd_dev_v2_parent_info(rbd_dev);
-               if (ret)
-                       return ret;
-
-               /*
-                * Print a warning if this is the initial probe and
-                * the image has a parent.  Don't print it if the
-                * image now being probed is itself a parent.  We
-                * can tell at this point because we won't know its
-                * pool name yet (just its pool id).
-                */
-               warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
-               if (first_time && warn)
-                       rbd_warn(rbd_dev, "WARNING: kernel layering "
-                                       "is EXPERIMENTAL!");
-       }
-
-       if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
-               if (rbd_dev->mapping.size != rbd_dev->header.image_size)
-                       rbd_dev->mapping.size = rbd_dev->header.image_size;
-
        ret = rbd_dev_v2_snap_context(rbd_dev);
        dout("rbd_dev_v2_snap_context returned %d\n", ret);
 
        return ret;
 }
 
+static int rbd_dev_header_info(struct rbd_device *rbd_dev)
+{
+       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+
+       if (rbd_dev->image_format == 1)
+               return rbd_dev_v1_header_info(rbd_dev);
+
+       return rbd_dev_v2_header_info(rbd_dev);
+}
+
 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
 {
        struct device *dev;
@@ -4682,6 +4795,38 @@ out_err:
        return ret;
 }
 
+/*
+ * Return pool id (>= 0) or a negative error code.
+ */
+static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
+{
+       u64 newest_epoch;
+       unsigned long timeout = rbdc->client->options->mount_timeout * HZ;
+       int tries = 0;
+       int ret;
+
+again:
+       ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
+       if (ret == -ENOENT && tries++ < 1) {
+               ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap",
+                                              &newest_epoch);
+               if (ret < 0)
+                       return ret;
+
+               if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
+                       ceph_monc_request_next_osdmap(&rbdc->client->monc);
+                       (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
+                                                    newest_epoch, timeout);
+                       goto again;
+               } else {
+                       /* the osdmap we have is new enough */
+                       return -ENOENT;
+               }
+       }
+
+       return ret;
+}
+
 /*
  * An rbd format 2 image has a unique identifier, distinct from the
  * name given to it by the user.  Internally, that identifier is
@@ -4752,7 +4897,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
 
                image_id = ceph_extract_encoded_string(&p, p + ret,
                                                NULL, GFP_NOIO);
-               ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
+               ret = PTR_ERR_OR_ZERO(image_id);
                if (!ret)
                        rbd_dev->image_format = 2;
        } else {
@@ -4907,6 +5052,7 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
        if (ret)
                goto err_out_disk;
        set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
+       set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
 
        ret = rbd_bus_add_dev(rbd_dev);
        if (ret)
@@ -4994,8 +5140,6 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
        ret = rbd_dev_image_id(rbd_dev);
        if (ret)
                return ret;
-       rbd_assert(rbd_dev->spec->image_id);
-       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
 
        ret = rbd_dev_header_name(rbd_dev);
        if (ret)
@@ -5007,25 +5151,45 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
                        goto out_header_name;
        }
 
-       if (rbd_dev->image_format == 1)
-               ret = rbd_dev_v1_header_info(rbd_dev);
-       else
-               ret = rbd_dev_v2_header_info(rbd_dev);
+       ret = rbd_dev_header_info(rbd_dev);
        if (ret)
                goto err_out_watch;
 
-       ret = rbd_dev_spec_update(rbd_dev);
+       /*
+        * If this image is the one being mapped, we have pool name and
+        * id, image name and id, and snap name - need to fill snap id.
+        * Otherwise this is a parent image, identified by pool, image
+        * and snap ids - need to fill in names for those ids.
+        */
+       if (mapping)
+               ret = rbd_spec_fill_snap_id(rbd_dev);
+       else
+               ret = rbd_spec_fill_names(rbd_dev);
        if (ret)
                goto err_out_probe;
 
+       if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
+               ret = rbd_dev_v2_parent_info(rbd_dev);
+               if (ret)
+                       goto err_out_probe;
+
+               /*
+                * Need to warn users if this image is the one being
+                * mapped and has a parent.
+                */
+               if (mapping && rbd_dev->parent_spec)
+                       rbd_warn(rbd_dev,
+                                "WARNING: kernel layering is EXPERIMENTAL!");
+       }
+
        ret = rbd_dev_probe_parent(rbd_dev);
        if (ret)
                goto err_out_probe;
 
        dout("discovered format %u image, header name is %s\n",
                rbd_dev->image_format, rbd_dev->header_name);
-
        return 0;
+
 err_out_probe:
        rbd_dev_unprobe(rbd_dev);
 err_out_watch:
@@ -5038,9 +5202,6 @@ err_out_format:
        rbd_dev->image_format = 0;
        kfree(rbd_dev->spec->image_id);
        rbd_dev->spec->image_id = NULL;
-
-       dout("probe failed, returning %d\n", ret);
-
        return ret;
 }
 
@@ -5053,7 +5214,6 @@ static ssize_t do_rbd_add(struct bus_type *bus,
        struct rbd_options *rbd_opts = NULL;
        struct rbd_spec *spec = NULL;
        struct rbd_client *rbdc;
-       struct ceph_osd_client *osdc;
        bool read_only;
        int rc = -ENOMEM;
 
@@ -5075,8 +5235,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
        }
 
        /* pick the pool */
-       osdc = &rbdc->client->osdc;
-       rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
+       rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
        if (rc < 0)
                goto err_out_client;
        spec->pool_id = (u64)rc;
@@ -5387,6 +5546,7 @@ err_out_slab:
 
 static void __exit rbd_exit(void)
 {
+       ida_destroy(&rbd_dev_id_ida);
        rbd_sysfs_cleanup();
        if (single_major)
                unregister_blkdev(rbd_major, RBD_DRV_NAME);