2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
58 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
60 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN \
62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
64 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
66 #define RBD_SNAP_HEAD_NAME "-"
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX 64
72 #define RBD_OBJ_PREFIX_LEN_MAX 64
76 #define RBD_FEATURE_LAYERING (1<<0)
77 #define RBD_FEATURE_STRIPINGV2 (1<<1)
78 #define RBD_FEATURES_ALL \
79 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
81 /* Features supported by this (client software) implementation. */
83 #define RBD_FEATURES_SUPPORTED (0)
86 * An RBD device name will be "rbd#", where the "rbd" comes from
87 * RBD_DRV_NAME above, and # is a unique integer identifier.
88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89 * enough to hold all possible device names.
91 #define DEV_NAME_LEN 32
92 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
95 * block device image metadata (in-memory version)
97 struct rbd_image_header {
98 /* These four fields never change for a given rbd image */
105 /* The remaining fields need to be updated occasionally */
107 struct ceph_snap_context *snapc;
115 * An rbd image specification.
117 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118 * identify an image. Each rbd_dev structure includes a pointer to
119 * an rbd_spec structure that encapsulates this identity.
121 * Each of the id's in an rbd_spec has an associated name. For a
122 * user-mapped image, the names are supplied and the id's associated
123 * with them are looked up. For a layered image, a parent image is
124 * defined by the tuple, and the names are looked up.
126 * An rbd_dev structure contains a parent_spec pointer which is
127 * non-null if the image it represents is a child in a layered
128 * image. This pointer will refer to the rbd_spec structure used
129 * by the parent rbd_dev for its own identity (i.e., the structure
130 * is shared between the parent and child).
132 * Since these structures are populated once, during the discovery
133 * phase of image construction, they are effectively immutable so
134 * we make no effort to synchronize access to them.
136 * Note that code herein does not assume the image name is known (it
137 * could be a null pointer).
153 * an instance of the client. multiple devices may share an rbd client.
156 struct ceph_client *client;
158 struct list_head node;
161 struct rbd_img_request;
162 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
164 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
166 struct rbd_obj_request;
167 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
169 enum obj_request_type {
170 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
174 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
175 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
176 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
177 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
180 struct rbd_obj_request {
181 const char *object_name;
182 u64 offset; /* object start byte */
183 u64 length; /* bytes from offset */
187 * An object request associated with an image will have its
188 * img_data flag set; a standalone object request will not.
190 * A standalone object request will have which == BAD_WHICH
191 * and a null obj_request pointer.
193 * An object request initiated in support of a layered image
194 * object (to check for its existence before a write) will
195 * have which == BAD_WHICH and a non-null obj_request pointer.
197 * Finally, an object request for rbd image data will have
198 * which != BAD_WHICH, and will have a non-null img_request
199 * pointer. The value of which will be in the range
200 * 0..(img_request->obj_request_count-1).
203 struct rbd_obj_request *obj_request; /* STAT op */
205 struct rbd_img_request *img_request;
207 /* links for img_request->obj_requests list */
208 struct list_head links;
211 u32 which; /* posn image request list */
213 enum obj_request_type type;
215 struct bio *bio_list;
222 struct ceph_osd_request *osd_req;
224 u64 xferred; /* bytes transferred */
228 rbd_obj_callback_t callback;
229 struct completion completion;
235 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
236 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
237 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
240 struct rbd_img_request {
241 struct rbd_device *rbd_dev;
242 u64 offset; /* starting image byte offset */
243 u64 length; /* byte count from offset */
246 u64 snap_id; /* for reads */
247 struct ceph_snap_context *snapc; /* for writes */
250 struct request *rq; /* block request */
251 struct rbd_obj_request *obj_request; /* obj req initiator */
253 spinlock_t completion_lock;/* protects next_completion */
255 rbd_img_callback_t callback;
256 u64 xferred;/* aggregate bytes transferred */
257 int result; /* first nonzero obj_request result */
259 u32 obj_request_count;
260 struct list_head obj_requests; /* rbd_obj_request structs */
265 #define for_each_obj_request(ireq, oreq) \
266 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
267 #define for_each_obj_request_from(ireq, oreq) \
268 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
269 #define for_each_obj_request_safe(ireq, oreq, n) \
270 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276 struct list_head node;
291 int dev_id; /* blkdev unique id */
293 int major; /* blkdev assigned major */
294 struct gendisk *disk; /* blkdev's gendisk and rq */
296 u32 image_format; /* Either 1 or 2 */
297 struct rbd_client *rbd_client;
299 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
301 spinlock_t lock; /* queue, flags, open_count */
303 struct rbd_image_header header;
304 unsigned long flags; /* possibly lock protected */
305 struct rbd_spec *spec;
309 struct ceph_file_layout layout;
311 struct ceph_osd_event *watch_event;
312 struct rbd_obj_request *watch_request;
314 struct rbd_spec *parent_spec;
316 struct rbd_device *parent;
318 /* protects updating the header */
319 struct rw_semaphore header_rwsem;
321 struct rbd_mapping mapping;
323 struct list_head node;
325 /* list of snapshots */
326 struct list_head snaps;
330 unsigned long open_count; /* protected by lock */
334 * Flag bits for rbd_dev->flags. If atomicity is required,
335 * rbd_dev->lock is used to protect access.
337 * Currently, only the "removing" flag (which is coupled with the
338 * "open_count" field) requires atomic access.
341 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
342 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
345 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
347 static LIST_HEAD(rbd_dev_list); /* devices */
348 static DEFINE_SPINLOCK(rbd_dev_list_lock);
350 static LIST_HEAD(rbd_client_list); /* clients */
351 static DEFINE_SPINLOCK(rbd_client_list_lock);
353 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
354 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
356 static void rbd_dev_release(struct device *dev);
357 static void rbd_remove_snap_dev(struct rbd_snap *snap);
359 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
361 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
363 static int rbd_dev_probe(struct rbd_device *rbd_dev);
365 static struct bus_attribute rbd_bus_attrs[] = {
366 __ATTR(add, S_IWUSR, NULL, rbd_add),
367 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
371 static struct bus_type rbd_bus_type = {
373 .bus_attrs = rbd_bus_attrs,
376 static void rbd_root_dev_release(struct device *dev)
380 static struct device rbd_root_dev = {
382 .release = rbd_root_dev_release,
385 static __printf(2, 3)
386 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
388 struct va_format vaf;
396 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
397 else if (rbd_dev->disk)
398 printk(KERN_WARNING "%s: %s: %pV\n",
399 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
400 else if (rbd_dev->spec && rbd_dev->spec->image_name)
401 printk(KERN_WARNING "%s: image %s: %pV\n",
402 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
403 else if (rbd_dev->spec && rbd_dev->spec->image_id)
404 printk(KERN_WARNING "%s: id %s: %pV\n",
405 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
407 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
408 RBD_DRV_NAME, rbd_dev, &vaf);
413 #define rbd_assert(expr) \
414 if (unlikely(!(expr))) { \
415 printk(KERN_ERR "\nAssertion failure in %s() " \
417 "\trbd_assert(%s);\n\n", \
418 __func__, __LINE__, #expr); \
421 #else /* !RBD_DEBUG */
422 # define rbd_assert(expr) ((void) 0)
423 #endif /* !RBD_DEBUG */
425 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
426 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
428 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
429 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
431 static int rbd_open(struct block_device *bdev, fmode_t mode)
433 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
434 bool removing = false;
436 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
439 spin_lock_irq(&rbd_dev->lock);
440 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
443 rbd_dev->open_count++;
444 spin_unlock_irq(&rbd_dev->lock);
448 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
449 (void) get_device(&rbd_dev->dev);
450 set_device_ro(bdev, rbd_dev->mapping.read_only);
451 mutex_unlock(&ctl_mutex);
456 static int rbd_release(struct gendisk *disk, fmode_t mode)
458 struct rbd_device *rbd_dev = disk->private_data;
459 unsigned long open_count_before;
461 spin_lock_irq(&rbd_dev->lock);
462 open_count_before = rbd_dev->open_count--;
463 spin_unlock_irq(&rbd_dev->lock);
464 rbd_assert(open_count_before > 0);
466 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
467 put_device(&rbd_dev->dev);
468 mutex_unlock(&ctl_mutex);
473 static const struct block_device_operations rbd_bd_ops = {
474 .owner = THIS_MODULE,
476 .release = rbd_release,
480 * Initialize an rbd client instance.
483 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
485 struct rbd_client *rbdc;
488 dout("%s:\n", __func__);
489 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
493 kref_init(&rbdc->kref);
494 INIT_LIST_HEAD(&rbdc->node);
496 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
498 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
499 if (IS_ERR(rbdc->client))
501 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
503 ret = ceph_open_session(rbdc->client);
507 spin_lock(&rbd_client_list_lock);
508 list_add_tail(&rbdc->node, &rbd_client_list);
509 spin_unlock(&rbd_client_list_lock);
511 mutex_unlock(&ctl_mutex);
512 dout("%s: rbdc %p\n", __func__, rbdc);
517 ceph_destroy_client(rbdc->client);
519 mutex_unlock(&ctl_mutex);
523 ceph_destroy_options(ceph_opts);
524 dout("%s: error %d\n", __func__, ret);
529 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
531 kref_get(&rbdc->kref);
537 * Find a ceph client with specific addr and configuration. If
538 * found, bump its reference count.
540 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
542 struct rbd_client *client_node;
545 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
548 spin_lock(&rbd_client_list_lock);
549 list_for_each_entry(client_node, &rbd_client_list, node) {
550 if (!ceph_compare_options(ceph_opts, client_node->client)) {
551 __rbd_get_client(client_node);
557 spin_unlock(&rbd_client_list_lock);
559 return found ? client_node : NULL;
569 /* string args above */
572 /* Boolean args above */
576 static match_table_t rbd_opts_tokens = {
578 /* string args above */
579 {Opt_read_only, "read_only"},
580 {Opt_read_only, "ro"}, /* Alternate spelling */
581 {Opt_read_write, "read_write"},
582 {Opt_read_write, "rw"}, /* Alternate spelling */
583 /* Boolean args above */
591 #define RBD_READ_ONLY_DEFAULT false
593 static int parse_rbd_opts_token(char *c, void *private)
595 struct rbd_options *rbd_opts = private;
596 substring_t argstr[MAX_OPT_ARGS];
597 int token, intval, ret;
599 token = match_token(c, rbd_opts_tokens, argstr);
603 if (token < Opt_last_int) {
604 ret = match_int(&argstr[0], &intval);
606 pr_err("bad mount option arg (not int) "
610 dout("got int token %d val %d\n", token, intval);
611 } else if (token > Opt_last_int && token < Opt_last_string) {
612 dout("got string token %d val %s\n", token,
614 } else if (token > Opt_last_string && token < Opt_last_bool) {
615 dout("got Boolean token %d\n", token);
617 dout("got token %d\n", token);
622 rbd_opts->read_only = true;
625 rbd_opts->read_only = false;
635 * Get a ceph client with specific addr and configuration, if one does
636 * not exist create it.
638 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
640 struct rbd_client *rbdc;
642 rbdc = rbd_client_find(ceph_opts);
643 if (rbdc) /* using an existing client */
644 ceph_destroy_options(ceph_opts);
646 rbdc = rbd_client_create(ceph_opts);
652 * Destroy ceph client
654 * Caller must hold rbd_client_list_lock.
656 static void rbd_client_release(struct kref *kref)
658 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
660 dout("%s: rbdc %p\n", __func__, rbdc);
661 spin_lock(&rbd_client_list_lock);
662 list_del(&rbdc->node);
663 spin_unlock(&rbd_client_list_lock);
665 ceph_destroy_client(rbdc->client);
670 * Drop reference to ceph client node. If it's not referenced anymore, release
673 static void rbd_put_client(struct rbd_client *rbdc)
676 kref_put(&rbdc->kref, rbd_client_release);
679 static bool rbd_image_format_valid(u32 image_format)
681 return image_format == 1 || image_format == 2;
684 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
689 /* The header has to start with the magic rbd header text */
690 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
693 /* The bio layer requires at least sector-sized I/O */
695 if (ondisk->options.order < SECTOR_SHIFT)
698 /* If we use u64 in a few spots we may be able to loosen this */
700 if (ondisk->options.order > 8 * sizeof (int) - 1)
704 * The size of a snapshot header has to fit in a size_t, and
705 * that limits the number of snapshots.
707 snap_count = le32_to_cpu(ondisk->snap_count);
708 size = SIZE_MAX - sizeof (struct ceph_snap_context);
709 if (snap_count > size / sizeof (__le64))
713 * Not only that, but the size of the entire the snapshot
714 * header must also be representable in a size_t.
716 size -= snap_count * sizeof (__le64);
717 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
724 * Create a new header structure, translate header format from the on-disk
727 static int rbd_header_from_disk(struct rbd_image_header *header,
728 struct rbd_image_header_ondisk *ondisk)
735 memset(header, 0, sizeof (*header));
737 snap_count = le32_to_cpu(ondisk->snap_count);
739 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
740 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
741 if (!header->object_prefix)
743 memcpy(header->object_prefix, ondisk->object_prefix, len);
744 header->object_prefix[len] = '\0';
747 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
749 /* Save a copy of the snapshot names */
751 if (snap_names_len > (u64) SIZE_MAX)
753 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
754 if (!header->snap_names)
757 * Note that rbd_dev_v1_header_read() guarantees
758 * the ondisk buffer we're working with has
759 * snap_names_len bytes beyond the end of the
760 * snapshot id array, this memcpy() is safe.
762 memcpy(header->snap_names, &ondisk->snaps[snap_count],
765 /* Record each snapshot's size */
767 size = snap_count * sizeof (*header->snap_sizes);
768 header->snap_sizes = kmalloc(size, GFP_KERNEL);
769 if (!header->snap_sizes)
771 for (i = 0; i < snap_count; i++)
772 header->snap_sizes[i] =
773 le64_to_cpu(ondisk->snaps[i].image_size);
775 WARN_ON(ondisk->snap_names_len);
776 header->snap_names = NULL;
777 header->snap_sizes = NULL;
780 header->features = 0; /* No features support in v1 images */
781 header->obj_order = ondisk->options.order;
782 header->crypt_type = ondisk->options.crypt_type;
783 header->comp_type = ondisk->options.comp_type;
785 /* Allocate and fill in the snapshot context */
787 header->image_size = le64_to_cpu(ondisk->image_size);
788 size = sizeof (struct ceph_snap_context);
789 size += snap_count * sizeof (header->snapc->snaps[0]);
790 header->snapc = kzalloc(size, GFP_KERNEL);
794 atomic_set(&header->snapc->nref, 1);
795 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
796 header->snapc->num_snaps = snap_count;
797 for (i = 0; i < snap_count; i++)
798 header->snapc->snaps[i] =
799 le64_to_cpu(ondisk->snaps[i].id);
804 kfree(header->snap_sizes);
805 header->snap_sizes = NULL;
806 kfree(header->snap_names);
807 header->snap_names = NULL;
808 kfree(header->object_prefix);
809 header->object_prefix = NULL;
814 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
816 struct rbd_snap *snap;
818 if (snap_id == CEPH_NOSNAP)
819 return RBD_SNAP_HEAD_NAME;
821 list_for_each_entry(snap, &rbd_dev->snaps, node)
822 if (snap_id == snap->id)
828 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
831 struct rbd_snap *snap;
833 list_for_each_entry(snap, &rbd_dev->snaps, node) {
834 if (!strcmp(snap_name, snap->name)) {
835 rbd_dev->spec->snap_id = snap->id;
836 rbd_dev->mapping.size = snap->size;
837 rbd_dev->mapping.features = snap->features;
846 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
850 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
851 sizeof (RBD_SNAP_HEAD_NAME))) {
852 rbd_dev->spec->snap_id = CEPH_NOSNAP;
853 rbd_dev->mapping.size = rbd_dev->header.image_size;
854 rbd_dev->mapping.features = rbd_dev->header.features;
857 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
860 rbd_dev->mapping.read_only = true;
862 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
868 static void rbd_header_free(struct rbd_image_header *header)
870 kfree(header->object_prefix);
871 header->object_prefix = NULL;
872 kfree(header->snap_sizes);
873 header->snap_sizes = NULL;
874 kfree(header->snap_names);
875 header->snap_names = NULL;
876 ceph_put_snap_context(header->snapc);
877 header->snapc = NULL;
880 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
886 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
889 segment = offset >> rbd_dev->header.obj_order;
890 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
891 rbd_dev->header.object_prefix, segment);
892 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
893 pr_err("error formatting segment name for #%llu (%d)\n",
902 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
904 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
906 return offset & (segment_size - 1);
909 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
910 u64 offset, u64 length)
912 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
914 offset &= segment_size - 1;
916 rbd_assert(length <= U64_MAX - offset);
917 if (offset + length > segment_size)
918 length = segment_size - offset;
924 * returns the size of an object in the image
926 static u64 rbd_obj_bytes(struct rbd_image_header *header)
928 return 1 << header->obj_order;
935 static void bio_chain_put(struct bio *chain)
941 chain = chain->bi_next;
947 * zeros a bio chain, starting at specific offset
949 static void zero_bio_chain(struct bio *chain, int start_ofs)
958 bio_for_each_segment(bv, chain, i) {
959 if (pos + bv->bv_len > start_ofs) {
960 int remainder = max(start_ofs - pos, 0);
961 buf = bvec_kmap_irq(bv, &flags);
962 memset(buf + remainder, 0,
963 bv->bv_len - remainder);
964 bvec_kunmap_irq(buf, &flags);
969 chain = chain->bi_next;
974 * similar to zero_bio_chain(), zeros data defined by a page array,
975 * starting at the given byte offset from the start of the array and
976 * continuing up to the given end offset. The pages array is
977 * assumed to be big enough to hold all bytes up to the end.
979 static void zero_pages(struct page **pages, u64 offset, u64 end)
981 struct page **page = &pages[offset >> PAGE_SHIFT];
983 rbd_assert(end > offset);
984 rbd_assert(end - offset <= (u64)SIZE_MAX);
985 while (offset < end) {
991 page_offset = (size_t)(offset & ~PAGE_MASK);
992 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
993 local_irq_save(flags);
994 kaddr = kmap_atomic(*page);
995 memset(kaddr + page_offset, 0, length);
996 kunmap_atomic(kaddr);
997 local_irq_restore(flags);
1005 * Clone a portion of a bio, starting at the given byte offset
1006 * and continuing for the number of bytes indicated.
1008 static struct bio *bio_clone_range(struct bio *bio_src,
1009 unsigned int offset,
1017 unsigned short end_idx;
1018 unsigned short vcnt;
1021 /* Handle the easy case for the caller */
1023 if (!offset && len == bio_src->bi_size)
1024 return bio_clone(bio_src, gfpmask);
1026 if (WARN_ON_ONCE(!len))
1028 if (WARN_ON_ONCE(len > bio_src->bi_size))
1030 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1033 /* Find first affected segment... */
1036 __bio_for_each_segment(bv, bio_src, idx, 0) {
1037 if (resid < bv->bv_len)
1039 resid -= bv->bv_len;
1043 /* ...and the last affected segment */
1046 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1047 if (resid <= bv->bv_len)
1049 resid -= bv->bv_len;
1051 vcnt = end_idx - idx + 1;
1053 /* Build the clone */
1055 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1057 return NULL; /* ENOMEM */
1059 bio->bi_bdev = bio_src->bi_bdev;
1060 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1061 bio->bi_rw = bio_src->bi_rw;
1062 bio->bi_flags |= 1 << BIO_CLONED;
1065 * Copy over our part of the bio_vec, then update the first
1066 * and last (or only) entries.
1068 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1069 vcnt * sizeof (struct bio_vec));
1070 bio->bi_io_vec[0].bv_offset += voff;
1072 bio->bi_io_vec[0].bv_len -= voff;
1073 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1075 bio->bi_io_vec[0].bv_len = len;
1078 bio->bi_vcnt = vcnt;
1086 * Clone a portion of a bio chain, starting at the given byte offset
1087 * into the first bio in the source chain and continuing for the
1088 * number of bytes indicated. The result is another bio chain of
1089 * exactly the given length, or a null pointer on error.
1091 * The bio_src and offset parameters are both in-out. On entry they
1092 * refer to the first source bio and the offset into that bio where
1093 * the start of data to be cloned is located.
1095 * On return, bio_src is updated to refer to the bio in the source
1096 * chain that contains first un-cloned byte, and *offset will
1097 * contain the offset of that byte within that bio.
1099 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1100 unsigned int *offset,
1104 struct bio *bi = *bio_src;
1105 unsigned int off = *offset;
1106 struct bio *chain = NULL;
1109 /* Build up a chain of clone bios up to the limit */
1111 if (!bi || off >= bi->bi_size || !len)
1112 return NULL; /* Nothing to clone */
1116 unsigned int bi_size;
1120 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1121 goto out_err; /* EINVAL; ran out of bio's */
1123 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1124 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1126 goto out_err; /* ENOMEM */
1129 end = &bio->bi_next;
1132 if (off == bi->bi_size) {
1143 bio_chain_put(chain);
1149 * The default/initial value for all object request flags is 0. For
1150 * each flag, once its value is set to 1 it is never reset to 0
1153 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1155 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1156 struct rbd_device *rbd_dev;
1158 rbd_dev = obj_request->img_request->rbd_dev;
1159 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1164 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1167 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1170 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1172 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1173 struct rbd_device *rbd_dev = NULL;
1175 if (obj_request_img_data_test(obj_request))
1176 rbd_dev = obj_request->img_request->rbd_dev;
1177 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1182 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1185 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1189 * This sets the KNOWN flag after (possibly) setting the EXISTS
1190 * flag. The latter is set based on the "exists" value provided.
1192 * Note that for our purposes once an object exists it never goes
1193 * away again. It's possible that the response from two existence
1194 * checks are separated by the creation of the target object, and
1195 * the first ("doesn't exist") response arrives *after* the second
1196 * ("does exist"). In that case we ignore the second one.
1198 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1202 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1203 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1207 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1210 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1213 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1216 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1219 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1221 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1222 atomic_read(&obj_request->kref.refcount));
1223 kref_get(&obj_request->kref);
1226 static void rbd_obj_request_destroy(struct kref *kref);
1227 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1229 rbd_assert(obj_request != NULL);
1230 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1231 atomic_read(&obj_request->kref.refcount));
1232 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1235 static void rbd_img_request_get(struct rbd_img_request *img_request)
1237 dout("%s: img %p (was %d)\n", __func__, img_request,
1238 atomic_read(&img_request->kref.refcount));
1239 kref_get(&img_request->kref);
1242 static void rbd_img_request_destroy(struct kref *kref);
1243 static void rbd_img_request_put(struct rbd_img_request *img_request)
1245 rbd_assert(img_request != NULL);
1246 dout("%s: img %p (was %d)\n", __func__, img_request,
1247 atomic_read(&img_request->kref.refcount));
1248 kref_put(&img_request->kref, rbd_img_request_destroy);
1251 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1252 struct rbd_obj_request *obj_request)
1254 rbd_assert(obj_request->img_request == NULL);
1256 /* Image request now owns object's original reference */
1257 obj_request->img_request = img_request;
1258 obj_request->which = img_request->obj_request_count;
1259 rbd_assert(!obj_request_img_data_test(obj_request));
1260 obj_request_img_data_set(obj_request);
1261 rbd_assert(obj_request->which != BAD_WHICH);
1262 img_request->obj_request_count++;
1263 list_add_tail(&obj_request->links, &img_request->obj_requests);
1264 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1265 obj_request->which);
1268 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1269 struct rbd_obj_request *obj_request)
1271 rbd_assert(obj_request->which != BAD_WHICH);
1273 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1274 obj_request->which);
1275 list_del(&obj_request->links);
1276 rbd_assert(img_request->obj_request_count > 0);
1277 img_request->obj_request_count--;
1278 rbd_assert(obj_request->which == img_request->obj_request_count);
1279 obj_request->which = BAD_WHICH;
1280 rbd_assert(obj_request_img_data_test(obj_request));
1281 rbd_assert(obj_request->img_request == img_request);
1282 obj_request->img_request = NULL;
1283 obj_request->callback = NULL;
1284 rbd_obj_request_put(obj_request);
1287 static bool obj_request_type_valid(enum obj_request_type type)
1290 case OBJ_REQUEST_NODATA:
1291 case OBJ_REQUEST_BIO:
1292 case OBJ_REQUEST_PAGES:
1299 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1300 struct rbd_obj_request *obj_request)
1302 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1304 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1307 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1310 dout("%s: img %p\n", __func__, img_request);
1313 * If no error occurred, compute the aggregate transfer
1314 * count for the image request. We could instead use
1315 * atomic64_cmpxchg() to update it as each object request
1316 * completes; not clear which way is better off hand.
1318 if (!img_request->result) {
1319 struct rbd_obj_request *obj_request;
1322 for_each_obj_request(img_request, obj_request)
1323 xferred += obj_request->xferred;
1324 img_request->xferred = xferred;
1327 if (img_request->callback)
1328 img_request->callback(img_request);
1330 rbd_img_request_put(img_request);
1333 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1335 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1337 dout("%s: obj %p\n", __func__, obj_request);
1339 return wait_for_completion_interruptible(&obj_request->completion);
1343 * The default/initial value for all image request flags is 0. Each
1344 * is conditionally set to 1 at image request initialization time
1345 * and currently never change thereafter.
1347 static void img_request_write_set(struct rbd_img_request *img_request)
1349 set_bit(IMG_REQ_WRITE, &img_request->flags);
1353 static bool img_request_write_test(struct rbd_img_request *img_request)
1356 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1359 static void img_request_child_set(struct rbd_img_request *img_request)
1361 set_bit(IMG_REQ_CHILD, &img_request->flags);
1365 static bool img_request_child_test(struct rbd_img_request *img_request)
1368 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1371 static void img_request_layered_set(struct rbd_img_request *img_request)
1373 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1377 static bool img_request_layered_test(struct rbd_img_request *img_request)
1380 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1384 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1386 u64 xferred = obj_request->xferred;
1387 u64 length = obj_request->length;
1389 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1390 obj_request, obj_request->img_request, obj_request->result,
1393 * ENOENT means a hole in the image. We zero-fill the
1394 * entire length of the request. A short read also implies
1395 * zero-fill to the end of the request. Either way we
1396 * update the xferred count to indicate the whole request
1399 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1400 if (obj_request->result == -ENOENT) {
1401 if (obj_request->type == OBJ_REQUEST_BIO)
1402 zero_bio_chain(obj_request->bio_list, 0);
1404 zero_pages(obj_request->pages, 0, length);
1405 obj_request->result = 0;
1406 obj_request->xferred = length;
1407 } else if (xferred < length && !obj_request->result) {
1408 if (obj_request->type == OBJ_REQUEST_BIO)
1409 zero_bio_chain(obj_request->bio_list, xferred);
1411 zero_pages(obj_request->pages, xferred, length);
1412 obj_request->xferred = length;
1414 obj_request_done_set(obj_request);
1417 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1419 dout("%s: obj %p cb %p\n", __func__, obj_request,
1420 obj_request->callback);
1421 if (obj_request->callback)
1422 obj_request->callback(obj_request);
1424 complete_all(&obj_request->completion);
1427 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1429 dout("%s: obj %p\n", __func__, obj_request);
1430 obj_request_done_set(obj_request);
1433 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1435 struct rbd_img_request *img_request = NULL;
1436 bool layered = false;
1438 if (obj_request_img_data_test(obj_request)) {
1439 img_request = obj_request->img_request;
1440 layered = img_request && img_request_layered_test(img_request);
1446 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1447 obj_request, img_request, obj_request->result,
1448 obj_request->xferred, obj_request->length);
1449 if (layered && obj_request->result == -ENOENT)
1450 rbd_img_parent_read(obj_request);
1451 else if (img_request)
1452 rbd_img_obj_request_read_callback(obj_request);
1454 obj_request_done_set(obj_request);
1457 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1459 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1460 obj_request->result, obj_request->length);
1462 * There is no such thing as a successful short write. Set
1463 * it to our originally-requested length.
1465 obj_request->xferred = obj_request->length;
1466 obj_request_done_set(obj_request);
1470 * For a simple stat call there's nothing to do. We'll do more if
1471 * this is part of a write sequence for a layered image.
1473 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1475 dout("%s: obj %p\n", __func__, obj_request);
1476 obj_request_done_set(obj_request);
1479 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1480 struct ceph_msg *msg)
1482 struct rbd_obj_request *obj_request = osd_req->r_priv;
1485 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1486 rbd_assert(osd_req == obj_request->osd_req);
1487 if (obj_request_img_data_test(obj_request)) {
1488 rbd_assert(obj_request->img_request);
1489 rbd_assert(obj_request->which != BAD_WHICH);
1491 rbd_assert(obj_request->which == BAD_WHICH);
1494 if (osd_req->r_result < 0)
1495 obj_request->result = osd_req->r_result;
1496 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1498 WARN_ON(osd_req->r_num_ops != 1); /* For now */
1501 * We support a 64-bit length, but ultimately it has to be
1502 * passed to blk_end_request(), which takes an unsigned int.
1504 obj_request->xferred = osd_req->r_reply_op_len[0];
1505 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1506 opcode = osd_req->r_ops[0].op;
1508 case CEPH_OSD_OP_READ:
1509 rbd_osd_read_callback(obj_request);
1511 case CEPH_OSD_OP_WRITE:
1512 rbd_osd_write_callback(obj_request);
1514 case CEPH_OSD_OP_STAT:
1515 rbd_osd_stat_callback(obj_request);
1517 case CEPH_OSD_OP_CALL:
1518 case CEPH_OSD_OP_NOTIFY_ACK:
1519 case CEPH_OSD_OP_WATCH:
1520 rbd_osd_trivial_callback(obj_request);
1523 rbd_warn(NULL, "%s: unsupported op %hu\n",
1524 obj_request->object_name, (unsigned short) opcode);
1528 if (obj_request_done_test(obj_request))
1529 rbd_obj_request_complete(obj_request);
1532 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1534 struct rbd_img_request *img_request = obj_request->img_request;
1535 struct ceph_osd_request *osd_req = obj_request->osd_req;
1538 rbd_assert(osd_req != NULL);
1540 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1541 ceph_osdc_build_request(osd_req, obj_request->offset,
1542 NULL, snap_id, NULL);
1545 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1547 struct rbd_img_request *img_request = obj_request->img_request;
1548 struct ceph_osd_request *osd_req = obj_request->osd_req;
1549 struct ceph_snap_context *snapc;
1550 struct timespec mtime = CURRENT_TIME;
1552 rbd_assert(osd_req != NULL);
1554 snapc = img_request ? img_request->snapc : NULL;
1555 ceph_osdc_build_request(osd_req, obj_request->offset,
1556 snapc, CEPH_NOSNAP, &mtime);
1559 static struct ceph_osd_request *rbd_osd_req_create(
1560 struct rbd_device *rbd_dev,
1562 struct rbd_obj_request *obj_request)
1564 struct ceph_snap_context *snapc = NULL;
1565 struct ceph_osd_client *osdc;
1566 struct ceph_osd_request *osd_req;
1568 if (obj_request_img_data_test(obj_request)) {
1569 struct rbd_img_request *img_request = obj_request->img_request;
1571 rbd_assert(write_request ==
1572 img_request_write_test(img_request));
1574 snapc = img_request->snapc;
1577 /* Allocate and initialize the request, for the single op */
1579 osdc = &rbd_dev->rbd_client->client->osdc;
1580 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1582 return NULL; /* ENOMEM */
1585 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1587 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1589 osd_req->r_callback = rbd_osd_req_callback;
1590 osd_req->r_priv = obj_request;
1592 osd_req->r_oid_len = strlen(obj_request->object_name);
1593 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1594 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1596 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1601 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1603 ceph_osdc_put_request(osd_req);
1606 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1608 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1609 u64 offset, u64 length,
1610 enum obj_request_type type)
1612 struct rbd_obj_request *obj_request;
1616 rbd_assert(obj_request_type_valid(type));
1618 size = strlen(object_name) + 1;
1619 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1623 name = (char *)(obj_request + 1);
1624 obj_request->object_name = memcpy(name, object_name, size);
1625 obj_request->offset = offset;
1626 obj_request->length = length;
1627 obj_request->flags = 0;
1628 obj_request->which = BAD_WHICH;
1629 obj_request->type = type;
1630 INIT_LIST_HEAD(&obj_request->links);
1631 init_completion(&obj_request->completion);
1632 kref_init(&obj_request->kref);
1634 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1635 offset, length, (int)type, obj_request);
1640 static void rbd_obj_request_destroy(struct kref *kref)
1642 struct rbd_obj_request *obj_request;
1644 obj_request = container_of(kref, struct rbd_obj_request, kref);
1646 dout("%s: obj %p\n", __func__, obj_request);
1648 rbd_assert(obj_request->img_request == NULL);
1649 rbd_assert(obj_request->which == BAD_WHICH);
1651 if (obj_request->osd_req)
1652 rbd_osd_req_destroy(obj_request->osd_req);
1654 rbd_assert(obj_request_type_valid(obj_request->type));
1655 switch (obj_request->type) {
1656 case OBJ_REQUEST_NODATA:
1657 break; /* Nothing to do */
1658 case OBJ_REQUEST_BIO:
1659 if (obj_request->bio_list)
1660 bio_chain_put(obj_request->bio_list);
1662 case OBJ_REQUEST_PAGES:
1663 if (obj_request->pages)
1664 ceph_release_page_vector(obj_request->pages,
1665 obj_request->page_count);
1673 * Caller is responsible for filling in the list of object requests
1674 * that comprises the image request, and the Linux request pointer
1675 * (if there is one).
1677 static struct rbd_img_request *rbd_img_request_create(
1678 struct rbd_device *rbd_dev,
1679 u64 offset, u64 length,
1683 struct rbd_img_request *img_request;
1684 struct ceph_snap_context *snapc = NULL;
1686 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1690 if (write_request) {
1691 down_read(&rbd_dev->header_rwsem);
1692 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1693 up_read(&rbd_dev->header_rwsem);
1694 if (WARN_ON(!snapc)) {
1696 return NULL; /* Shouldn't happen */
1701 img_request->rq = NULL;
1702 img_request->rbd_dev = rbd_dev;
1703 img_request->offset = offset;
1704 img_request->length = length;
1705 img_request->flags = 0;
1706 if (write_request) {
1707 img_request_write_set(img_request);
1708 img_request->snapc = snapc;
1710 img_request->snap_id = rbd_dev->spec->snap_id;
1713 img_request_child_set(img_request);
1714 if (rbd_dev->parent_spec)
1715 img_request_layered_set(img_request);
1716 spin_lock_init(&img_request->completion_lock);
1717 img_request->next_completion = 0;
1718 img_request->callback = NULL;
1719 img_request->result = 0;
1720 img_request->obj_request_count = 0;
1721 INIT_LIST_HEAD(&img_request->obj_requests);
1722 kref_init(&img_request->kref);
1724 rbd_img_request_get(img_request); /* Avoid a warning */
1725 rbd_img_request_put(img_request); /* TEMPORARY */
1727 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1728 write_request ? "write" : "read", offset, length,
1734 static void rbd_img_request_destroy(struct kref *kref)
1736 struct rbd_img_request *img_request;
1737 struct rbd_obj_request *obj_request;
1738 struct rbd_obj_request *next_obj_request;
1740 img_request = container_of(kref, struct rbd_img_request, kref);
1742 dout("%s: img %p\n", __func__, img_request);
1744 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1745 rbd_img_obj_request_del(img_request, obj_request);
1746 rbd_assert(img_request->obj_request_count == 0);
1748 if (img_request_write_test(img_request))
1749 ceph_put_snap_context(img_request->snapc);
1751 if (img_request_child_test(img_request))
1752 rbd_obj_request_put(img_request->obj_request);
1757 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1759 struct rbd_img_request *img_request;
1760 unsigned int xferred;
1764 rbd_assert(obj_request_img_data_test(obj_request));
1765 img_request = obj_request->img_request;
1767 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1768 xferred = (unsigned int)obj_request->xferred;
1769 result = obj_request->result;
1771 struct rbd_device *rbd_dev = img_request->rbd_dev;
1773 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1774 img_request_write_test(img_request) ? "write" : "read",
1775 obj_request->length, obj_request->img_offset,
1776 obj_request->offset);
1777 rbd_warn(rbd_dev, " result %d xferred %x\n",
1779 if (!img_request->result)
1780 img_request->result = result;
1783 if (img_request_child_test(img_request)) {
1784 rbd_assert(img_request->obj_request != NULL);
1785 more = obj_request->which < img_request->obj_request_count - 1;
1787 rbd_assert(img_request->rq != NULL);
1788 more = blk_end_request(img_request->rq, result, xferred);
1794 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1796 struct rbd_img_request *img_request;
1797 u32 which = obj_request->which;
1800 rbd_assert(obj_request_img_data_test(obj_request));
1801 img_request = obj_request->img_request;
1803 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1804 rbd_assert(img_request != NULL);
1805 rbd_assert(img_request->obj_request_count > 0);
1806 rbd_assert(which != BAD_WHICH);
1807 rbd_assert(which < img_request->obj_request_count);
1808 rbd_assert(which >= img_request->next_completion);
1810 spin_lock_irq(&img_request->completion_lock);
1811 if (which != img_request->next_completion)
1814 for_each_obj_request_from(img_request, obj_request) {
1816 rbd_assert(which < img_request->obj_request_count);
1818 if (!obj_request_done_test(obj_request))
1820 more = rbd_img_obj_end_request(obj_request);
1824 rbd_assert(more ^ (which == img_request->obj_request_count));
1825 img_request->next_completion = which;
1827 spin_unlock_irq(&img_request->completion_lock);
1830 rbd_img_request_complete(img_request);
1833 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1834 struct bio *bio_list)
1836 struct rbd_device *rbd_dev = img_request->rbd_dev;
1837 struct rbd_obj_request *obj_request = NULL;
1838 struct rbd_obj_request *next_obj_request;
1839 bool write_request = img_request_write_test(img_request);
1840 unsigned int bio_offset;
1845 dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1847 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1849 img_offset = img_request->offset;
1850 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1851 resid = img_request->length;
1852 rbd_assert(resid > 0);
1854 struct ceph_osd_request *osd_req;
1855 const char *object_name;
1856 unsigned int clone_size;
1860 object_name = rbd_segment_name(rbd_dev, img_offset);
1863 offset = rbd_segment_offset(rbd_dev, img_offset);
1864 length = rbd_segment_length(rbd_dev, img_offset, resid);
1865 obj_request = rbd_obj_request_create(object_name,
1868 kfree(object_name); /* object request has its own copy */
1872 rbd_assert(length <= (u64) UINT_MAX);
1873 clone_size = (unsigned int) length;
1874 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1875 &bio_offset, clone_size,
1877 if (!obj_request->bio_list)
1880 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1884 obj_request->osd_req = osd_req;
1885 obj_request->callback = rbd_img_obj_callback;
1887 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1889 osd_req_op_extent_osd_data_bio(osd_req, 0,
1890 obj_request->bio_list, obj_request->length);
1893 rbd_osd_req_format_write(obj_request);
1895 rbd_osd_req_format_read(obj_request);
1897 obj_request->img_offset = img_offset;
1898 rbd_img_obj_request_add(img_request, obj_request);
1900 img_offset += length;
1907 rbd_obj_request_put(obj_request);
1909 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1910 rbd_obj_request_put(obj_request);
1915 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
1917 struct rbd_obj_request *orig_request;
1920 rbd_assert(!obj_request_img_data_test(obj_request));
1923 * All we need from the object request is the original
1924 * request and the result of the STAT op. Grab those, then
1925 * we're done with the request.
1927 orig_request = obj_request->obj_request;
1928 obj_request->obj_request = NULL;
1929 rbd_assert(orig_request);
1930 rbd_assert(orig_request->img_request);
1932 result = obj_request->result;
1933 obj_request->result = 0;
1935 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
1936 obj_request, orig_request, result,
1937 obj_request->xferred, obj_request->length);
1938 rbd_obj_request_put(obj_request);
1940 rbd_assert(orig_request);
1941 rbd_assert(orig_request->img_request);
1944 * Our only purpose here is to determine whether the object
1945 * exists, and we don't want to treat the non-existence as
1946 * an error. If something else comes back, transfer the
1947 * error to the original request and complete it now.
1950 obj_request_existence_set(orig_request, true);
1951 } else if (result == -ENOENT) {
1952 obj_request_existence_set(orig_request, false);
1953 } else if (result) {
1954 orig_request->result = result;
1959 * Resubmit the original request now that we have recorded
1960 * whether the target object exists.
1962 orig_request->result = rbd_img_obj_request_submit(orig_request);
1964 if (orig_request->result)
1965 rbd_obj_request_complete(orig_request);
1966 rbd_obj_request_put(orig_request);
1969 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
1971 struct rbd_obj_request *stat_request;
1972 struct rbd_device *rbd_dev;
1973 struct ceph_osd_client *osdc;
1974 struct page **pages = NULL;
1980 * The response data for a STAT call consists of:
1987 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
1988 page_count = (u32)calc_pages_for(0, size);
1989 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1991 return PTR_ERR(pages);
1994 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
1999 rbd_obj_request_get(obj_request);
2000 stat_request->obj_request = obj_request;
2001 stat_request->pages = pages;
2002 stat_request->page_count = page_count;
2004 rbd_assert(obj_request->img_request);
2005 rbd_dev = obj_request->img_request->rbd_dev;
2006 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2008 if (!stat_request->osd_req)
2010 stat_request->callback = rbd_img_obj_exists_callback;
2012 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2013 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2015 rbd_osd_req_format_read(stat_request);
2017 osdc = &rbd_dev->rbd_client->client->osdc;
2018 ret = rbd_obj_request_submit(osdc, stat_request);
2021 rbd_obj_request_put(obj_request);
2026 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2028 struct rbd_img_request *img_request;
2030 rbd_assert(obj_request_img_data_test(obj_request));
2032 img_request = obj_request->img_request;
2033 rbd_assert(img_request);
2035 /* (At the moment we don't care whether it exists or not...) */
2036 (void) obj_request_exists_test;
2039 * Only layered writes need special handling. If it's not a
2040 * layered write, or it is a layered write but we know the
2041 * target object exists, it's no different from any other
2044 if (!img_request_write_test(img_request) ||
2045 !img_request_layered_test(img_request) ||
2046 obj_request_known_test(obj_request)) {
2048 struct rbd_device *rbd_dev;
2049 struct ceph_osd_client *osdc;
2051 rbd_dev = obj_request->img_request->rbd_dev;
2052 osdc = &rbd_dev->rbd_client->client->osdc;
2054 return rbd_obj_request_submit(osdc, obj_request);
2058 * It's a layered write and we don't know whether the target
2059 * exists. Issue existence check; once that completes the
2060 * original request will be submitted again.
2063 return rbd_img_obj_exists_submit(obj_request);
2066 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2068 struct rbd_obj_request *obj_request;
2069 struct rbd_obj_request *next_obj_request;
2071 dout("%s: img %p\n", __func__, img_request);
2072 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2075 ret = rbd_img_obj_request_submit(obj_request);
2083 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2085 struct rbd_obj_request *obj_request;
2087 rbd_assert(img_request_child_test(img_request));
2089 obj_request = img_request->obj_request;
2090 rbd_assert(obj_request != NULL);
2091 obj_request->result = img_request->result;
2092 obj_request->xferred = img_request->xferred;
2094 rbd_img_obj_request_read_callback(obj_request);
2095 rbd_obj_request_complete(obj_request);
2098 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2100 struct rbd_device *rbd_dev;
2101 struct rbd_img_request *img_request;
2104 rbd_assert(obj_request_img_data_test(obj_request));
2105 rbd_assert(obj_request->img_request != NULL);
2106 rbd_assert(obj_request->result == (s32) -ENOENT);
2107 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2109 rbd_dev = obj_request->img_request->rbd_dev;
2110 rbd_assert(rbd_dev->parent != NULL);
2111 /* rbd_read_finish(obj_request, obj_request->length); */
2112 img_request = rbd_img_request_create(rbd_dev->parent,
2113 obj_request->img_offset,
2114 obj_request->length,
2120 rbd_obj_request_get(obj_request);
2121 img_request->obj_request = obj_request;
2123 result = rbd_img_request_fill_bio(img_request, obj_request->bio_list);
2127 img_request->callback = rbd_img_parent_read_callback;
2128 result = rbd_img_request_submit(img_request);
2135 rbd_img_request_put(img_request);
2136 obj_request->result = result;
2137 obj_request->xferred = 0;
2138 obj_request_done_set(obj_request);
2141 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2142 u64 ver, u64 notify_id)
2144 struct rbd_obj_request *obj_request;
2145 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2148 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2149 OBJ_REQUEST_NODATA);
2154 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2155 if (!obj_request->osd_req)
2157 obj_request->callback = rbd_obj_request_put;
2159 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2161 rbd_osd_req_format_read(obj_request);
2163 ret = rbd_obj_request_submit(osdc, obj_request);
2166 rbd_obj_request_put(obj_request);
2171 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2173 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2180 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2181 rbd_dev->header_name, (unsigned long long) notify_id,
2182 (unsigned int) opcode);
2183 rc = rbd_dev_refresh(rbd_dev, &hver);
2185 rbd_warn(rbd_dev, "got notification but failed to "
2186 " update snaps: %d\n", rc);
2188 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2192 * Request sync osd watch/unwatch. The value of "start" determines
2193 * whether a watch request is being initiated or torn down.
2195 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2197 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2198 struct rbd_obj_request *obj_request;
2201 rbd_assert(start ^ !!rbd_dev->watch_event);
2202 rbd_assert(start ^ !!rbd_dev->watch_request);
2205 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2206 &rbd_dev->watch_event);
2209 rbd_assert(rbd_dev->watch_event != NULL);
2213 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2214 OBJ_REQUEST_NODATA);
2218 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2219 if (!obj_request->osd_req)
2223 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2225 ceph_osdc_unregister_linger_request(osdc,
2226 rbd_dev->watch_request->osd_req);
2228 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2229 rbd_dev->watch_event->cookie,
2230 rbd_dev->header.obj_version, start);
2231 rbd_osd_req_format_write(obj_request);
2233 ret = rbd_obj_request_submit(osdc, obj_request);
2236 ret = rbd_obj_request_wait(obj_request);
2239 ret = obj_request->result;
2244 * A watch request is set to linger, so the underlying osd
2245 * request won't go away until we unregister it. We retain
2246 * a pointer to the object request during that time (in
2247 * rbd_dev->watch_request), so we'll keep a reference to
2248 * it. We'll drop that reference (below) after we've
2252 rbd_dev->watch_request = obj_request;
2257 /* We have successfully torn down the watch request */
2259 rbd_obj_request_put(rbd_dev->watch_request);
2260 rbd_dev->watch_request = NULL;
2262 /* Cancel the event if we're tearing down, or on error */
2263 ceph_osdc_cancel_event(rbd_dev->watch_event);
2264 rbd_dev->watch_event = NULL;
2266 rbd_obj_request_put(obj_request);
2272 * Synchronous osd object method call
2274 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2275 const char *object_name,
2276 const char *class_name,
2277 const char *method_name,
2278 const char *outbound,
2279 size_t outbound_size,
2281 size_t inbound_size,
2284 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2285 struct rbd_obj_request *obj_request;
2286 struct page **pages;
2291 * Method calls are ultimately read operations. The result
2292 * should placed into the inbound buffer provided. They
2293 * also supply outbound data--parameters for the object
2294 * method. Currently if this is present it will be a
2297 page_count = (u32) calc_pages_for(0, inbound_size);
2298 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2300 return PTR_ERR(pages);
2303 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2308 obj_request->pages = pages;
2309 obj_request->page_count = page_count;
2311 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2312 if (!obj_request->osd_req)
2315 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2316 class_name, method_name);
2317 if (outbound_size) {
2318 struct ceph_pagelist *pagelist;
2320 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2324 ceph_pagelist_init(pagelist);
2325 ceph_pagelist_append(pagelist, outbound, outbound_size);
2326 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2329 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2330 obj_request->pages, inbound_size,
2332 rbd_osd_req_format_read(obj_request);
2334 ret = rbd_obj_request_submit(osdc, obj_request);
2337 ret = rbd_obj_request_wait(obj_request);
2341 ret = obj_request->result;
2345 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2347 *version = obj_request->version;
2350 rbd_obj_request_put(obj_request);
2352 ceph_release_page_vector(pages, page_count);
2357 static void rbd_request_fn(struct request_queue *q)
2358 __releases(q->queue_lock) __acquires(q->queue_lock)
2360 struct rbd_device *rbd_dev = q->queuedata;
2361 bool read_only = rbd_dev->mapping.read_only;
2365 while ((rq = blk_fetch_request(q))) {
2366 bool write_request = rq_data_dir(rq) == WRITE;
2367 struct rbd_img_request *img_request;
2371 /* Ignore any non-FS requests that filter through. */
2373 if (rq->cmd_type != REQ_TYPE_FS) {
2374 dout("%s: non-fs request type %d\n", __func__,
2375 (int) rq->cmd_type);
2376 __blk_end_request_all(rq, 0);
2380 /* Ignore/skip any zero-length requests */
2382 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2383 length = (u64) blk_rq_bytes(rq);
2386 dout("%s: zero-length request\n", __func__);
2387 __blk_end_request_all(rq, 0);
2391 spin_unlock_irq(q->queue_lock);
2393 /* Disallow writes to a read-only device */
2395 if (write_request) {
2399 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2403 * Quit early if the mapped snapshot no longer
2404 * exists. It's still possible the snapshot will
2405 * have disappeared by the time our request arrives
2406 * at the osd, but there's no sense in sending it if
2409 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2410 dout("request for non-existent snapshot");
2411 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2417 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2418 goto end_request; /* Shouldn't happen */
2421 img_request = rbd_img_request_create(rbd_dev, offset, length,
2422 write_request, false);
2426 img_request->rq = rq;
2428 result = rbd_img_request_fill_bio(img_request, rq->bio);
2430 result = rbd_img_request_submit(img_request);
2432 rbd_img_request_put(img_request);
2434 spin_lock_irq(q->queue_lock);
2436 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2437 write_request ? "write" : "read",
2438 length, offset, result);
2440 __blk_end_request_all(rq, result);
2446 * a queue callback. Makes sure that we don't create a bio that spans across
2447 * multiple osd objects. One exception would be with a single page bios,
2448 * which we handle later at bio_chain_clone_range()
2450 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2451 struct bio_vec *bvec)
2453 struct rbd_device *rbd_dev = q->queuedata;
2454 sector_t sector_offset;
2455 sector_t sectors_per_obj;
2456 sector_t obj_sector_offset;
2460 * Find how far into its rbd object the partition-relative
2461 * bio start sector is to offset relative to the enclosing
2464 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2465 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2466 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2469 * Compute the number of bytes from that offset to the end
2470 * of the object. Account for what's already used by the bio.
2472 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2473 if (ret > bmd->bi_size)
2474 ret -= bmd->bi_size;
2479 * Don't send back more than was asked for. And if the bio
2480 * was empty, let the whole thing through because: "Note
2481 * that a block device *must* allow a single page to be
2482 * added to an empty bio."
2484 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2485 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2486 ret = (int) bvec->bv_len;
2491 static void rbd_free_disk(struct rbd_device *rbd_dev)
2493 struct gendisk *disk = rbd_dev->disk;
2498 if (disk->flags & GENHD_FL_UP)
2501 blk_cleanup_queue(disk->queue);
2505 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2506 const char *object_name,
2507 u64 offset, u64 length,
2508 char *buf, u64 *version)
2511 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2512 struct rbd_obj_request *obj_request;
2513 struct page **pages = NULL;
2518 page_count = (u32) calc_pages_for(offset, length);
2519 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2521 ret = PTR_ERR(pages);
2524 obj_request = rbd_obj_request_create(object_name, offset, length,
2529 obj_request->pages = pages;
2530 obj_request->page_count = page_count;
2532 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2533 if (!obj_request->osd_req)
2536 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2537 offset, length, 0, 0);
2538 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2540 obj_request->length,
2541 obj_request->offset & ~PAGE_MASK,
2543 rbd_osd_req_format_read(obj_request);
2545 ret = rbd_obj_request_submit(osdc, obj_request);
2548 ret = rbd_obj_request_wait(obj_request);
2552 ret = obj_request->result;
2556 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2557 size = (size_t) obj_request->xferred;
2558 ceph_copy_from_page_vector(pages, buf, 0, size);
2559 rbd_assert(size <= (size_t) INT_MAX);
2562 *version = obj_request->version;
2565 rbd_obj_request_put(obj_request);
2567 ceph_release_page_vector(pages, page_count);
2573 * Read the complete header for the given rbd device.
2575 * Returns a pointer to a dynamically-allocated buffer containing
2576 * the complete and validated header. Caller can pass the address
2577 * of a variable that will be filled in with the version of the
2578 * header object at the time it was read.
2580 * Returns a pointer-coded errno if a failure occurs.
2582 static struct rbd_image_header_ondisk *
2583 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2585 struct rbd_image_header_ondisk *ondisk = NULL;
2592 * The complete header will include an array of its 64-bit
2593 * snapshot ids, followed by the names of those snapshots as
2594 * a contiguous block of NUL-terminated strings. Note that
2595 * the number of snapshots could change by the time we read
2596 * it in, in which case we re-read it.
2603 size = sizeof (*ondisk);
2604 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2606 ondisk = kmalloc(size, GFP_KERNEL);
2608 return ERR_PTR(-ENOMEM);
2610 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2612 (char *) ondisk, version);
2615 if (WARN_ON((size_t) ret < size)) {
2617 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2621 if (!rbd_dev_ondisk_valid(ondisk)) {
2623 rbd_warn(rbd_dev, "invalid header");
2627 names_size = le64_to_cpu(ondisk->snap_names_len);
2628 want_count = snap_count;
2629 snap_count = le32_to_cpu(ondisk->snap_count);
2630 } while (snap_count != want_count);
2637 return ERR_PTR(ret);
2641 * reload the ondisk the header
2643 static int rbd_read_header(struct rbd_device *rbd_dev,
2644 struct rbd_image_header *header)
2646 struct rbd_image_header_ondisk *ondisk;
2650 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2652 return PTR_ERR(ondisk);
2653 ret = rbd_header_from_disk(header, ondisk);
2655 header->obj_version = ver;
2661 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2663 struct rbd_snap *snap;
2664 struct rbd_snap *next;
2666 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2667 rbd_remove_snap_dev(snap);
2670 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2674 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2677 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2678 dout("setting size to %llu sectors", (unsigned long long) size);
2679 rbd_dev->mapping.size = (u64) size;
2680 set_capacity(rbd_dev->disk, size);
2684 * only read the first part of the ondisk header, without the snaps info
2686 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2689 struct rbd_image_header h;
2691 ret = rbd_read_header(rbd_dev, &h);
2695 down_write(&rbd_dev->header_rwsem);
2697 /* Update image size, and check for resize of mapped image */
2698 rbd_dev->header.image_size = h.image_size;
2699 rbd_update_mapping_size(rbd_dev);
2701 /* rbd_dev->header.object_prefix shouldn't change */
2702 kfree(rbd_dev->header.snap_sizes);
2703 kfree(rbd_dev->header.snap_names);
2704 /* osd requests may still refer to snapc */
2705 ceph_put_snap_context(rbd_dev->header.snapc);
2708 *hver = h.obj_version;
2709 rbd_dev->header.obj_version = h.obj_version;
2710 rbd_dev->header.image_size = h.image_size;
2711 rbd_dev->header.snapc = h.snapc;
2712 rbd_dev->header.snap_names = h.snap_names;
2713 rbd_dev->header.snap_sizes = h.snap_sizes;
2714 /* Free the extra copy of the object prefix */
2715 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2716 kfree(h.object_prefix);
2718 ret = rbd_dev_snaps_update(rbd_dev);
2720 ret = rbd_dev_snaps_register(rbd_dev);
2722 up_write(&rbd_dev->header_rwsem);
2727 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2731 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2732 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2733 if (rbd_dev->image_format == 1)
2734 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2736 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2737 mutex_unlock(&ctl_mutex);
2742 static int rbd_init_disk(struct rbd_device *rbd_dev)
2744 struct gendisk *disk;
2745 struct request_queue *q;
2748 /* create gendisk info */
2749 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2753 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2755 disk->major = rbd_dev->major;
2756 disk->first_minor = 0;
2757 disk->fops = &rbd_bd_ops;
2758 disk->private_data = rbd_dev;
2760 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2764 /* We use the default size, but let's be explicit about it. */
2765 blk_queue_physical_block_size(q, SECTOR_SIZE);
2767 /* set io sizes to object size */
2768 segment_size = rbd_obj_bytes(&rbd_dev->header);
2769 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2770 blk_queue_max_segment_size(q, segment_size);
2771 blk_queue_io_min(q, segment_size);
2772 blk_queue_io_opt(q, segment_size);
2774 blk_queue_merge_bvec(q, rbd_merge_bvec);
2777 q->queuedata = rbd_dev;
2779 rbd_dev->disk = disk;
2781 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2794 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2796 return container_of(dev, struct rbd_device, dev);
2799 static ssize_t rbd_size_show(struct device *dev,
2800 struct device_attribute *attr, char *buf)
2802 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2805 down_read(&rbd_dev->header_rwsem);
2806 size = get_capacity(rbd_dev->disk);
2807 up_read(&rbd_dev->header_rwsem);
2809 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2813 * Note this shows the features for whatever's mapped, which is not
2814 * necessarily the base image.
2816 static ssize_t rbd_features_show(struct device *dev,
2817 struct device_attribute *attr, char *buf)
2819 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2821 return sprintf(buf, "0x%016llx\n",
2822 (unsigned long long) rbd_dev->mapping.features);
2825 static ssize_t rbd_major_show(struct device *dev,
2826 struct device_attribute *attr, char *buf)
2828 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2830 return sprintf(buf, "%d\n", rbd_dev->major);
2833 static ssize_t rbd_client_id_show(struct device *dev,
2834 struct device_attribute *attr, char *buf)
2836 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2838 return sprintf(buf, "client%lld\n",
2839 ceph_client_id(rbd_dev->rbd_client->client));
2842 static ssize_t rbd_pool_show(struct device *dev,
2843 struct device_attribute *attr, char *buf)
2845 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2847 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2850 static ssize_t rbd_pool_id_show(struct device *dev,
2851 struct device_attribute *attr, char *buf)
2853 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2855 return sprintf(buf, "%llu\n",
2856 (unsigned long long) rbd_dev->spec->pool_id);
2859 static ssize_t rbd_name_show(struct device *dev,
2860 struct device_attribute *attr, char *buf)
2862 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2864 if (rbd_dev->spec->image_name)
2865 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2867 return sprintf(buf, "(unknown)\n");
2870 static ssize_t rbd_image_id_show(struct device *dev,
2871 struct device_attribute *attr, char *buf)
2873 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2875 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2879 * Shows the name of the currently-mapped snapshot (or
2880 * RBD_SNAP_HEAD_NAME for the base image).
2882 static ssize_t rbd_snap_show(struct device *dev,
2883 struct device_attribute *attr,
2886 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2888 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2892 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2893 * for the parent image. If there is no parent, simply shows
2894 * "(no parent image)".
2896 static ssize_t rbd_parent_show(struct device *dev,
2897 struct device_attribute *attr,
2900 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2901 struct rbd_spec *spec = rbd_dev->parent_spec;
2906 return sprintf(buf, "(no parent image)\n");
2908 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2909 (unsigned long long) spec->pool_id, spec->pool_name);
2914 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2915 spec->image_name ? spec->image_name : "(unknown)");
2920 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2921 (unsigned long long) spec->snap_id, spec->snap_name);
2926 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2931 return (ssize_t) (bufp - buf);
2934 static ssize_t rbd_image_refresh(struct device *dev,
2935 struct device_attribute *attr,
2939 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2942 ret = rbd_dev_refresh(rbd_dev, NULL);
2944 return ret < 0 ? ret : size;
2947 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2948 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2949 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2950 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2951 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2952 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2953 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2954 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2955 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2956 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2957 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2959 static struct attribute *rbd_attrs[] = {
2960 &dev_attr_size.attr,
2961 &dev_attr_features.attr,
2962 &dev_attr_major.attr,
2963 &dev_attr_client_id.attr,
2964 &dev_attr_pool.attr,
2965 &dev_attr_pool_id.attr,
2966 &dev_attr_name.attr,
2967 &dev_attr_image_id.attr,
2968 &dev_attr_current_snap.attr,
2969 &dev_attr_parent.attr,
2970 &dev_attr_refresh.attr,
2974 static struct attribute_group rbd_attr_group = {
2978 static const struct attribute_group *rbd_attr_groups[] = {
2983 static void rbd_sysfs_dev_release(struct device *dev)
2987 static struct device_type rbd_device_type = {
2989 .groups = rbd_attr_groups,
2990 .release = rbd_sysfs_dev_release,
2998 static ssize_t rbd_snap_size_show(struct device *dev,
2999 struct device_attribute *attr,
3002 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3004 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
3007 static ssize_t rbd_snap_id_show(struct device *dev,
3008 struct device_attribute *attr,
3011 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3013 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
3016 static ssize_t rbd_snap_features_show(struct device *dev,
3017 struct device_attribute *attr,
3020 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3022 return sprintf(buf, "0x%016llx\n",
3023 (unsigned long long) snap->features);
3026 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
3027 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
3028 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
3030 static struct attribute *rbd_snap_attrs[] = {
3031 &dev_attr_snap_size.attr,
3032 &dev_attr_snap_id.attr,
3033 &dev_attr_snap_features.attr,
3037 static struct attribute_group rbd_snap_attr_group = {
3038 .attrs = rbd_snap_attrs,
3041 static void rbd_snap_dev_release(struct device *dev)
3043 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3048 static const struct attribute_group *rbd_snap_attr_groups[] = {
3049 &rbd_snap_attr_group,
3053 static struct device_type rbd_snap_device_type = {
3054 .groups = rbd_snap_attr_groups,
3055 .release = rbd_snap_dev_release,
3058 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3060 kref_get(&spec->kref);
3065 static void rbd_spec_free(struct kref *kref);
3066 static void rbd_spec_put(struct rbd_spec *spec)
3069 kref_put(&spec->kref, rbd_spec_free);
3072 static struct rbd_spec *rbd_spec_alloc(void)
3074 struct rbd_spec *spec;
3076 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3079 kref_init(&spec->kref);
3084 static void rbd_spec_free(struct kref *kref)
3086 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3088 kfree(spec->pool_name);
3089 kfree(spec->image_id);
3090 kfree(spec->image_name);
3091 kfree(spec->snap_name);
3095 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3096 struct rbd_spec *spec)
3098 struct rbd_device *rbd_dev;
3100 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3104 spin_lock_init(&rbd_dev->lock);
3106 INIT_LIST_HEAD(&rbd_dev->node);
3107 INIT_LIST_HEAD(&rbd_dev->snaps);
3108 init_rwsem(&rbd_dev->header_rwsem);
3110 rbd_dev->spec = spec;
3111 rbd_dev->rbd_client = rbdc;
3113 /* Initialize the layout used for all rbd requests */
3115 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3116 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3117 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3118 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3123 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3125 rbd_spec_put(rbd_dev->parent_spec);
3126 kfree(rbd_dev->header_name);
3127 rbd_put_client(rbd_dev->rbd_client);
3128 rbd_spec_put(rbd_dev->spec);
3132 static bool rbd_snap_registered(struct rbd_snap *snap)
3134 bool ret = snap->dev.type == &rbd_snap_device_type;
3135 bool reg = device_is_registered(&snap->dev);
3137 rbd_assert(!ret ^ reg);
3142 static void rbd_remove_snap_dev(struct rbd_snap *snap)
3144 list_del(&snap->node);
3145 if (device_is_registered(&snap->dev))
3146 device_unregister(&snap->dev);
3149 static int rbd_register_snap_dev(struct rbd_snap *snap,
3150 struct device *parent)
3152 struct device *dev = &snap->dev;
3155 dev->type = &rbd_snap_device_type;
3156 dev->parent = parent;
3157 dev->release = rbd_snap_dev_release;
3158 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
3159 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
3161 ret = device_register(dev);
3166 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
3167 const char *snap_name,
3168 u64 snap_id, u64 snap_size,
3171 struct rbd_snap *snap;
3174 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3176 return ERR_PTR(-ENOMEM);
3179 snap->name = kstrdup(snap_name, GFP_KERNEL);
3184 snap->size = snap_size;
3185 snap->features = snap_features;
3193 return ERR_PTR(ret);
3196 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3197 u64 *snap_size, u64 *snap_features)
3201 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3203 *snap_size = rbd_dev->header.snap_sizes[which];
3204 *snap_features = 0; /* No features for v1 */
3206 /* Skip over names until we find the one we are looking for */
3208 snap_name = rbd_dev->header.snap_names;
3210 snap_name += strlen(snap_name) + 1;
3216 * Get the size and object order for an image snapshot, or if
3217 * snap_id is CEPH_NOSNAP, gets this information for the base
3220 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3221 u8 *order, u64 *snap_size)
3223 __le64 snapid = cpu_to_le64(snap_id);
3228 } __attribute__ ((packed)) size_buf = { 0 };
3230 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3232 (char *) &snapid, sizeof (snapid),
3233 (char *) &size_buf, sizeof (size_buf), NULL);
3234 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3238 *order = size_buf.order;
3239 *snap_size = le64_to_cpu(size_buf.size);
3241 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
3242 (unsigned long long) snap_id, (unsigned int) *order,
3243 (unsigned long long) *snap_size);
3248 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3250 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3251 &rbd_dev->header.obj_order,
3252 &rbd_dev->header.image_size);
3255 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3261 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3265 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3266 "rbd", "get_object_prefix",
3268 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3269 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3274 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3275 p + RBD_OBJ_PREFIX_LEN_MAX,
3278 if (IS_ERR(rbd_dev->header.object_prefix)) {
3279 ret = PTR_ERR(rbd_dev->header.object_prefix);
3280 rbd_dev->header.object_prefix = NULL;
3282 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3291 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3294 __le64 snapid = cpu_to_le64(snap_id);
3298 } features_buf = { 0 };
3302 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3303 "rbd", "get_features",
3304 (char *) &snapid, sizeof (snapid),
3305 (char *) &features_buf, sizeof (features_buf),
3307 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3311 incompat = le64_to_cpu(features_buf.incompat);
3312 if (incompat & ~RBD_FEATURES_SUPPORTED)
3315 *snap_features = le64_to_cpu(features_buf.features);
3317 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3318 (unsigned long long) snap_id,
3319 (unsigned long long) *snap_features,
3320 (unsigned long long) le64_to_cpu(features_buf.incompat));
3325 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3327 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3328 &rbd_dev->header.features);
3331 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3333 struct rbd_spec *parent_spec;
3335 void *reply_buf = NULL;
3343 parent_spec = rbd_spec_alloc();
3347 size = sizeof (__le64) + /* pool_id */
3348 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3349 sizeof (__le64) + /* snap_id */
3350 sizeof (__le64); /* overlap */
3351 reply_buf = kmalloc(size, GFP_KERNEL);
3357 snapid = cpu_to_le64(CEPH_NOSNAP);
3358 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3359 "rbd", "get_parent",
3360 (char *) &snapid, sizeof (snapid),
3361 (char *) reply_buf, size, NULL);
3362 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3368 end = (char *) reply_buf + size;
3369 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3370 if (parent_spec->pool_id == CEPH_NOPOOL)
3371 goto out; /* No parent? No problem. */
3373 /* The ceph file layout needs to fit pool id in 32 bits */
3376 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
3379 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3380 if (IS_ERR(image_id)) {
3381 ret = PTR_ERR(image_id);
3384 parent_spec->image_id = image_id;
3385 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3386 ceph_decode_64_safe(&p, end, overlap, out_err);
3388 rbd_dev->parent_overlap = overlap;
3389 rbd_dev->parent_spec = parent_spec;
3390 parent_spec = NULL; /* rbd_dev now owns this */
3395 rbd_spec_put(parent_spec);
3400 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3402 size_t image_id_size;
3407 void *reply_buf = NULL;
3409 char *image_name = NULL;
3412 rbd_assert(!rbd_dev->spec->image_name);
3414 len = strlen(rbd_dev->spec->image_id);
3415 image_id_size = sizeof (__le32) + len;
3416 image_id = kmalloc(image_id_size, GFP_KERNEL);
3421 end = (char *) image_id + image_id_size;
3422 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
3424 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3425 reply_buf = kmalloc(size, GFP_KERNEL);
3429 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3430 "rbd", "dir_get_name",
3431 image_id, image_id_size,
3432 (char *) reply_buf, size, NULL);
3436 end = (char *) reply_buf + size;
3437 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3438 if (IS_ERR(image_name))
3441 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3450 * When a parent image gets probed, we only have the pool, image,
3451 * and snapshot ids but not the names of any of them. This call
3452 * is made later to fill in those names. It has to be done after
3453 * rbd_dev_snaps_update() has completed because some of the
3454 * information (in particular, snapshot name) is not available
3457 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3459 struct ceph_osd_client *osdc;
3461 void *reply_buf = NULL;
3464 if (rbd_dev->spec->pool_name)
3465 return 0; /* Already have the names */
3467 /* Look up the pool name */
3469 osdc = &rbd_dev->rbd_client->client->osdc;
3470 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3472 rbd_warn(rbd_dev, "there is no pool with id %llu",
3473 rbd_dev->spec->pool_id); /* Really a BUG() */
3477 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3478 if (!rbd_dev->spec->pool_name)
3481 /* Fetch the image name; tolerate failure here */
3483 name = rbd_dev_image_name(rbd_dev);
3485 rbd_dev->spec->image_name = (char *) name;
3487 rbd_warn(rbd_dev, "unable to get image name");
3489 /* Look up the snapshot name. */
3491 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3493 rbd_warn(rbd_dev, "no snapshot with id %llu",
3494 rbd_dev->spec->snap_id); /* Really a BUG() */
3498 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3499 if(!rbd_dev->spec->snap_name)
3505 kfree(rbd_dev->spec->pool_name);
3506 rbd_dev->spec->pool_name = NULL;
3511 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3520 struct ceph_snap_context *snapc;
3524 * We'll need room for the seq value (maximum snapshot id),
3525 * snapshot count, and array of that many snapshot ids.
3526 * For now we have a fixed upper limit on the number we're
3527 * prepared to receive.
3529 size = sizeof (__le64) + sizeof (__le32) +
3530 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3531 reply_buf = kzalloc(size, GFP_KERNEL);
3535 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3536 "rbd", "get_snapcontext",
3538 reply_buf, size, ver);
3539 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3545 end = (char *) reply_buf + size;
3546 ceph_decode_64_safe(&p, end, seq, out);
3547 ceph_decode_32_safe(&p, end, snap_count, out);
3550 * Make sure the reported number of snapshot ids wouldn't go
3551 * beyond the end of our buffer. But before checking that,
3552 * make sure the computed size of the snapshot context we
3553 * allocate is representable in a size_t.
3555 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3560 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3563 size = sizeof (struct ceph_snap_context) +
3564 snap_count * sizeof (snapc->snaps[0]);
3565 snapc = kmalloc(size, GFP_KERNEL);
3571 atomic_set(&snapc->nref, 1);
3573 snapc->num_snaps = snap_count;
3574 for (i = 0; i < snap_count; i++)
3575 snapc->snaps[i] = ceph_decode_64(&p);
3577 rbd_dev->header.snapc = snapc;
3579 dout(" snap context seq = %llu, snap_count = %u\n",
3580 (unsigned long long) seq, (unsigned int) snap_count);
3588 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3598 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3599 reply_buf = kmalloc(size, GFP_KERNEL);
3601 return ERR_PTR(-ENOMEM);
3603 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3604 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3605 "rbd", "get_snapshot_name",
3606 (char *) &snap_id, sizeof (snap_id),
3607 reply_buf, size, NULL);
3608 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3613 end = (char *) reply_buf + size;
3614 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3615 if (IS_ERR(snap_name)) {
3616 ret = PTR_ERR(snap_name);
3619 dout(" snap_id 0x%016llx snap_name = %s\n",
3620 (unsigned long long) le64_to_cpu(snap_id), snap_name);
3628 return ERR_PTR(ret);
3631 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3632 u64 *snap_size, u64 *snap_features)
3638 snap_id = rbd_dev->header.snapc->snaps[which];
3639 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3641 return ERR_PTR(ret);
3642 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3644 return ERR_PTR(ret);
3646 return rbd_dev_v2_snap_name(rbd_dev, which);
3649 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3650 u64 *snap_size, u64 *snap_features)
3652 if (rbd_dev->image_format == 1)
3653 return rbd_dev_v1_snap_info(rbd_dev, which,
3654 snap_size, snap_features);
3655 if (rbd_dev->image_format == 2)
3656 return rbd_dev_v2_snap_info(rbd_dev, which,
3657 snap_size, snap_features);
3658 return ERR_PTR(-EINVAL);
3661 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3666 down_write(&rbd_dev->header_rwsem);
3668 /* Grab old order first, to see if it changes */
3670 obj_order = rbd_dev->header.obj_order,
3671 ret = rbd_dev_v2_image_size(rbd_dev);
3674 if (rbd_dev->header.obj_order != obj_order) {
3678 rbd_update_mapping_size(rbd_dev);
3680 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3681 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3684 ret = rbd_dev_snaps_update(rbd_dev);
3685 dout("rbd_dev_snaps_update returned %d\n", ret);
3688 ret = rbd_dev_snaps_register(rbd_dev);
3689 dout("rbd_dev_snaps_register returned %d\n", ret);
3691 up_write(&rbd_dev->header_rwsem);
3697 * Scan the rbd device's current snapshot list and compare it to the
3698 * newly-received snapshot context. Remove any existing snapshots
3699 * not present in the new snapshot context. Add a new snapshot for
3700 * any snaphots in the snapshot context not in the current list.
3701 * And verify there are no changes to snapshots we already know
3704 * Assumes the snapshots in the snapshot context are sorted by
3705 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3706 * are also maintained in that order.)
3708 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3710 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3711 const u32 snap_count = snapc->num_snaps;
3712 struct list_head *head = &rbd_dev->snaps;
3713 struct list_head *links = head->next;
3716 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3717 while (index < snap_count || links != head) {
3719 struct rbd_snap *snap;
3722 u64 snap_features = 0;
3724 snap_id = index < snap_count ? snapc->snaps[index]
3726 snap = links != head ? list_entry(links, struct rbd_snap, node)
3728 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3730 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3731 struct list_head *next = links->next;
3734 * A previously-existing snapshot is not in
3735 * the new snap context.
3737 * If the now missing snapshot is the one the
3738 * image is mapped to, clear its exists flag
3739 * so we can avoid sending any more requests
3742 if (rbd_dev->spec->snap_id == snap->id)
3743 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3744 rbd_remove_snap_dev(snap);
3745 dout("%ssnap id %llu has been removed\n",
3746 rbd_dev->spec->snap_id == snap->id ?
3748 (unsigned long long) snap->id);
3750 /* Done with this list entry; advance */
3756 snap_name = rbd_dev_snap_info(rbd_dev, index,
3757 &snap_size, &snap_features);
3758 if (IS_ERR(snap_name))
3759 return PTR_ERR(snap_name);
3761 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3762 (unsigned long long) snap_id);
3763 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3764 struct rbd_snap *new_snap;
3766 /* We haven't seen this snapshot before */
3768 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3769 snap_id, snap_size, snap_features);
3770 if (IS_ERR(new_snap)) {
3771 int err = PTR_ERR(new_snap);
3773 dout(" failed to add dev, error %d\n", err);
3778 /* New goes before existing, or at end of list */
3780 dout(" added dev%s\n", snap ? "" : " at end\n");
3782 list_add_tail(&new_snap->node, &snap->node);
3784 list_add_tail(&new_snap->node, head);
3786 /* Already have this one */
3788 dout(" already present\n");
3790 rbd_assert(snap->size == snap_size);
3791 rbd_assert(!strcmp(snap->name, snap_name));
3792 rbd_assert(snap->features == snap_features);
3794 /* Done with this list entry; advance */
3796 links = links->next;
3799 /* Advance to the next entry in the snapshot context */
3803 dout("%s: done\n", __func__);
3809 * Scan the list of snapshots and register the devices for any that
3810 * have not already been registered.
3812 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3814 struct rbd_snap *snap;
3817 dout("%s:\n", __func__);
3818 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3821 list_for_each_entry(snap, &rbd_dev->snaps, node) {
3822 if (!rbd_snap_registered(snap)) {
3823 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3828 dout("%s: returning %d\n", __func__, ret);
3833 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3838 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3840 dev = &rbd_dev->dev;
3841 dev->bus = &rbd_bus_type;
3842 dev->type = &rbd_device_type;
3843 dev->parent = &rbd_root_dev;
3844 dev->release = rbd_dev_release;
3845 dev_set_name(dev, "%d", rbd_dev->dev_id);
3846 ret = device_register(dev);
3848 mutex_unlock(&ctl_mutex);
3853 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3855 device_unregister(&rbd_dev->dev);
3858 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3861 * Get a unique rbd identifier for the given new rbd_dev, and add
3862 * the rbd_dev to the global list. The minimum rbd id is 1.
3864 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3866 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3868 spin_lock(&rbd_dev_list_lock);
3869 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3870 spin_unlock(&rbd_dev_list_lock);
3871 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3872 (unsigned long long) rbd_dev->dev_id);
3876 * Remove an rbd_dev from the global list, and record that its
3877 * identifier is no longer in use.
3879 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3881 struct list_head *tmp;
3882 int rbd_id = rbd_dev->dev_id;
3885 rbd_assert(rbd_id > 0);
3887 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3888 (unsigned long long) rbd_dev->dev_id);
3889 spin_lock(&rbd_dev_list_lock);
3890 list_del_init(&rbd_dev->node);
3893 * If the id being "put" is not the current maximum, there
3894 * is nothing special we need to do.
3896 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3897 spin_unlock(&rbd_dev_list_lock);
3902 * We need to update the current maximum id. Search the
3903 * list to find out what it is. We're more likely to find
3904 * the maximum at the end, so search the list backward.
3907 list_for_each_prev(tmp, &rbd_dev_list) {
3908 struct rbd_device *rbd_dev;
3910 rbd_dev = list_entry(tmp, struct rbd_device, node);
3911 if (rbd_dev->dev_id > max_id)
3912 max_id = rbd_dev->dev_id;
3914 spin_unlock(&rbd_dev_list_lock);
3917 * The max id could have been updated by rbd_dev_id_get(), in
3918 * which case it now accurately reflects the new maximum.
3919 * Be careful not to overwrite the maximum value in that
3922 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3923 dout(" max dev id has been reset\n");
3927 * Skips over white space at *buf, and updates *buf to point to the
3928 * first found non-space character (if any). Returns the length of
3929 * the token (string of non-white space characters) found. Note
3930 * that *buf must be terminated with '\0'.
3932 static inline size_t next_token(const char **buf)
3935 * These are the characters that produce nonzero for
3936 * isspace() in the "C" and "POSIX" locales.
3938 const char *spaces = " \f\n\r\t\v";
3940 *buf += strspn(*buf, spaces); /* Find start of token */
3942 return strcspn(*buf, spaces); /* Return token length */
3946 * Finds the next token in *buf, and if the provided token buffer is
3947 * big enough, copies the found token into it. The result, if
3948 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3949 * must be terminated with '\0' on entry.
3951 * Returns the length of the token found (not including the '\0').
3952 * Return value will be 0 if no token is found, and it will be >=
3953 * token_size if the token would not fit.
3955 * The *buf pointer will be updated to point beyond the end of the
3956 * found token. Note that this occurs even if the token buffer is
3957 * too small to hold it.
3959 static inline size_t copy_token(const char **buf,
3965 len = next_token(buf);
3966 if (len < token_size) {
3967 memcpy(token, *buf, len);
3968 *(token + len) = '\0';
3976 * Finds the next token in *buf, dynamically allocates a buffer big
3977 * enough to hold a copy of it, and copies the token into the new
3978 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3979 * that a duplicate buffer is created even for a zero-length token.
3981 * Returns a pointer to the newly-allocated duplicate, or a null
3982 * pointer if memory for the duplicate was not available. If
3983 * the lenp argument is a non-null pointer, the length of the token
3984 * (not including the '\0') is returned in *lenp.
3986 * If successful, the *buf pointer will be updated to point beyond
3987 * the end of the found token.
3989 * Note: uses GFP_KERNEL for allocation.
3991 static inline char *dup_token(const char **buf, size_t *lenp)
3996 len = next_token(buf);
3997 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4000 *(dup + len) = '\0';
4010 * Parse the options provided for an "rbd add" (i.e., rbd image
4011 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4012 * and the data written is passed here via a NUL-terminated buffer.
4013 * Returns 0 if successful or an error code otherwise.
4015 * The information extracted from these options is recorded in
4016 * the other parameters which return dynamically-allocated
4019 * The address of a pointer that will refer to a ceph options
4020 * structure. Caller must release the returned pointer using
4021 * ceph_destroy_options() when it is no longer needed.
4023 * Address of an rbd options pointer. Fully initialized by
4024 * this function; caller must release with kfree().
4026 * Address of an rbd image specification pointer. Fully
4027 * initialized by this function based on parsed options.
4028 * Caller must release with rbd_spec_put().
4030 * The options passed take this form:
4031 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4034 * A comma-separated list of one or more monitor addresses.
4035 * A monitor address is an ip address, optionally followed
4036 * by a port number (separated by a colon).
4037 * I.e.: ip1[:port1][,ip2[:port2]...]
4039 * A comma-separated list of ceph and/or rbd options.
4041 * The name of the rados pool containing the rbd image.
4043 * The name of the image in that pool to map.
4045 * An optional snapshot id. If provided, the mapping will
4046 * present data from the image at the time that snapshot was
4047 * created. The image head is used if no snapshot id is
4048 * provided. Snapshot mappings are always read-only.
4050 static int rbd_add_parse_args(const char *buf,
4051 struct ceph_options **ceph_opts,
4052 struct rbd_options **opts,
4053 struct rbd_spec **rbd_spec)
4057 const char *mon_addrs;
4058 size_t mon_addrs_size;
4059 struct rbd_spec *spec = NULL;
4060 struct rbd_options *rbd_opts = NULL;
4061 struct ceph_options *copts;
4064 /* The first four tokens are required */
4066 len = next_token(&buf);
4068 rbd_warn(NULL, "no monitor address(es) provided");
4072 mon_addrs_size = len + 1;
4076 options = dup_token(&buf, NULL);
4080 rbd_warn(NULL, "no options provided");
4084 spec = rbd_spec_alloc();
4088 spec->pool_name = dup_token(&buf, NULL);
4089 if (!spec->pool_name)
4091 if (!*spec->pool_name) {
4092 rbd_warn(NULL, "no pool name provided");
4096 spec->image_name = dup_token(&buf, NULL);
4097 if (!spec->image_name)
4099 if (!*spec->image_name) {
4100 rbd_warn(NULL, "no image name provided");
4105 * Snapshot name is optional; default is to use "-"
4106 * (indicating the head/no snapshot).
4108 len = next_token(&buf);
4110 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4111 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4112 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4113 ret = -ENAMETOOLONG;
4116 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4117 if (!spec->snap_name)
4119 *(spec->snap_name + len) = '\0';
4121 /* Initialize all rbd options to the defaults */
4123 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4127 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4129 copts = ceph_parse_options(options, mon_addrs,
4130 mon_addrs + mon_addrs_size - 1,
4131 parse_rbd_opts_token, rbd_opts);
4132 if (IS_ERR(copts)) {
4133 ret = PTR_ERR(copts);
4154 * An rbd format 2 image has a unique identifier, distinct from the
4155 * name given to it by the user. Internally, that identifier is
4156 * what's used to specify the names of objects related to the image.
4158 * A special "rbd id" object is used to map an rbd image name to its
4159 * id. If that object doesn't exist, then there is no v2 rbd image
4160 * with the supplied name.
4162 * This function will record the given rbd_dev's image_id field if
4163 * it can be determined, and in that case will return 0. If any
4164 * errors occur a negative errno will be returned and the rbd_dev's
4165 * image_id field will be unchanged (and should be NULL).
4167 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4175 /* If we already have it we don't need to look it up */
4177 if (rbd_dev->spec->image_id)
4181 * When probing a parent image, the image id is already
4182 * known (and the image name likely is not). There's no
4183 * need to fetch the image id again in this case.
4185 if (rbd_dev->spec->image_id)
4189 * First, see if the format 2 image id file exists, and if
4190 * so, get the image's persistent id from it.
4192 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4193 object_name = kmalloc(size, GFP_NOIO);
4196 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4197 dout("rbd id object name is %s\n", object_name);
4199 /* Response will be an encoded string, which includes a length */
4201 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4202 response = kzalloc(size, GFP_NOIO);
4208 ret = rbd_obj_method_sync(rbd_dev, object_name,
4211 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4212 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4217 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
4218 p + RBD_IMAGE_ID_LEN_MAX,
4220 if (IS_ERR(rbd_dev->spec->image_id)) {
4221 ret = PTR_ERR(rbd_dev->spec->image_id);
4222 rbd_dev->spec->image_id = NULL;
4224 dout("image_id is %s\n", rbd_dev->spec->image_id);
4233 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4238 /* Version 1 images have no id; empty string is used */
4240 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
4241 if (!rbd_dev->spec->image_id)
4244 /* Record the header object name for this rbd image. */
4246 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
4247 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4248 if (!rbd_dev->header_name) {
4252 sprintf(rbd_dev->header_name, "%s%s",
4253 rbd_dev->spec->image_name, RBD_SUFFIX);
4255 /* Populate rbd image metadata */
4257 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4261 /* Version 1 images have no parent (no layering) */
4263 rbd_dev->parent_spec = NULL;
4264 rbd_dev->parent_overlap = 0;
4266 rbd_dev->image_format = 1;
4268 dout("discovered version 1 image, header name is %s\n",
4269 rbd_dev->header_name);
4274 kfree(rbd_dev->header_name);
4275 rbd_dev->header_name = NULL;
4276 kfree(rbd_dev->spec->image_id);
4277 rbd_dev->spec->image_id = NULL;
4282 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4289 * Image id was filled in by the caller. Record the header
4290 * object name for this rbd image.
4292 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4293 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4294 if (!rbd_dev->header_name)
4296 sprintf(rbd_dev->header_name, "%s%s",
4297 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4299 /* Get the size and object order for the image */
4301 ret = rbd_dev_v2_image_size(rbd_dev);
4305 /* Get the object prefix (a.k.a. block_name) for the image */
4307 ret = rbd_dev_v2_object_prefix(rbd_dev);
4311 /* Get the and check features for the image */
4313 ret = rbd_dev_v2_features(rbd_dev);
4317 /* If the image supports layering, get the parent info */
4319 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4320 ret = rbd_dev_v2_parent_info(rbd_dev);
4325 /* crypto and compression type aren't (yet) supported for v2 images */
4327 rbd_dev->header.crypt_type = 0;
4328 rbd_dev->header.comp_type = 0;
4330 /* Get the snapshot context, plus the header version */
4332 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4335 rbd_dev->header.obj_version = ver;
4337 rbd_dev->image_format = 2;
4339 dout("discovered version 2 image, header name is %s\n",
4340 rbd_dev->header_name);
4344 rbd_dev->parent_overlap = 0;
4345 rbd_spec_put(rbd_dev->parent_spec);
4346 rbd_dev->parent_spec = NULL;
4347 kfree(rbd_dev->header_name);
4348 rbd_dev->header_name = NULL;
4349 kfree(rbd_dev->header.object_prefix);
4350 rbd_dev->header.object_prefix = NULL;
4355 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4357 struct rbd_device *parent = NULL;
4358 struct rbd_spec *parent_spec = NULL;
4359 struct rbd_client *rbdc = NULL;
4362 /* no need to lock here, as rbd_dev is not registered yet */
4363 ret = rbd_dev_snaps_update(rbd_dev);
4367 ret = rbd_dev_probe_update_spec(rbd_dev);
4371 ret = rbd_dev_set_mapping(rbd_dev);
4375 /* generate unique id: find highest unique id, add one */
4376 rbd_dev_id_get(rbd_dev);
4378 /* Fill in the device name, now that we have its id. */
4379 BUILD_BUG_ON(DEV_NAME_LEN
4380 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4381 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4383 /* Get our block major device number. */
4385 ret = register_blkdev(0, rbd_dev->name);
4388 rbd_dev->major = ret;
4390 /* Set up the blkdev mapping. */
4392 ret = rbd_init_disk(rbd_dev);
4394 goto err_out_blkdev;
4396 ret = rbd_bus_add_dev(rbd_dev);
4401 * At this point cleanup in the event of an error is the job
4402 * of the sysfs code (initiated by rbd_bus_del_dev()).
4404 /* Probe the parent if there is one */
4406 if (rbd_dev->parent_spec) {
4408 * We need to pass a reference to the client and the
4409 * parent spec when creating the parent rbd_dev.
4410 * Images related by parent/child relationships
4411 * always share both.
4413 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4414 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4416 parent = rbd_dev_create(rbdc, parent_spec);
4421 rbdc = NULL; /* parent now owns reference */
4422 parent_spec = NULL; /* parent now owns reference */
4423 ret = rbd_dev_probe(parent);
4425 goto err_out_parent;
4426 rbd_dev->parent = parent;
4429 down_write(&rbd_dev->header_rwsem);
4430 ret = rbd_dev_snaps_register(rbd_dev);
4431 up_write(&rbd_dev->header_rwsem);
4435 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4439 /* Everything's ready. Announce the disk to the world. */
4441 add_disk(rbd_dev->disk);
4443 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4444 (unsigned long long) rbd_dev->mapping.size);
4449 rbd_dev_destroy(parent);
4451 rbd_spec_put(parent_spec);
4452 rbd_put_client(rbdc);
4454 /* this will also clean up rest of rbd_dev stuff */
4456 rbd_bus_del_dev(rbd_dev);
4460 rbd_free_disk(rbd_dev);
4462 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4464 rbd_dev_id_put(rbd_dev);
4466 rbd_remove_all_snaps(rbd_dev);
4472 * Probe for the existence of the header object for the given rbd
4473 * device. For format 2 images this includes determining the image
4476 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4481 * Get the id from the image id object. If it's not a
4482 * format 2 image, we'll get ENOENT back, and we'll assume
4483 * it's a format 1 image.
4485 ret = rbd_dev_image_id(rbd_dev);
4487 ret = rbd_dev_v1_probe(rbd_dev);
4489 ret = rbd_dev_v2_probe(rbd_dev);
4491 dout("probe failed, returning %d\n", ret);
4496 ret = rbd_dev_probe_finish(rbd_dev);
4498 rbd_header_free(&rbd_dev->header);
4503 static ssize_t rbd_add(struct bus_type *bus,
4507 struct rbd_device *rbd_dev = NULL;
4508 struct ceph_options *ceph_opts = NULL;
4509 struct rbd_options *rbd_opts = NULL;
4510 struct rbd_spec *spec = NULL;
4511 struct rbd_client *rbdc;
4512 struct ceph_osd_client *osdc;
4515 if (!try_module_get(THIS_MODULE))
4518 /* parse add command */
4519 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4521 goto err_out_module;
4523 rbdc = rbd_get_client(ceph_opts);
4528 ceph_opts = NULL; /* rbd_dev client now owns this */
4531 osdc = &rbdc->client->osdc;
4532 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4534 goto err_out_client;
4535 spec->pool_id = (u64) rc;
4537 /* The ceph file layout needs to fit pool id in 32 bits */
4539 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4541 goto err_out_client;
4544 rbd_dev = rbd_dev_create(rbdc, spec);
4546 goto err_out_client;
4547 rbdc = NULL; /* rbd_dev now owns this */
4548 spec = NULL; /* rbd_dev now owns this */
4550 rbd_dev->mapping.read_only = rbd_opts->read_only;
4552 rbd_opts = NULL; /* done with this */
4554 rc = rbd_dev_probe(rbd_dev);
4556 goto err_out_rbd_dev;
4560 rbd_dev_destroy(rbd_dev);
4562 rbd_put_client(rbdc);
4565 ceph_destroy_options(ceph_opts);
4569 module_put(THIS_MODULE);
4571 dout("Error adding device %s\n", buf);
4573 return (ssize_t) rc;
4576 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4578 struct list_head *tmp;
4579 struct rbd_device *rbd_dev;
4581 spin_lock(&rbd_dev_list_lock);
4582 list_for_each(tmp, &rbd_dev_list) {
4583 rbd_dev = list_entry(tmp, struct rbd_device, node);
4584 if (rbd_dev->dev_id == dev_id) {
4585 spin_unlock(&rbd_dev_list_lock);
4589 spin_unlock(&rbd_dev_list_lock);
4593 static void rbd_dev_release(struct device *dev)
4595 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4597 if (rbd_dev->watch_event)
4598 rbd_dev_header_watch_sync(rbd_dev, 0);
4600 /* clean up and free blkdev */
4601 rbd_free_disk(rbd_dev);
4602 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4604 /* release allocated disk header fields */
4605 rbd_header_free(&rbd_dev->header);
4607 /* done with the id, and with the rbd_dev */
4608 rbd_dev_id_put(rbd_dev);
4609 rbd_assert(rbd_dev->rbd_client != NULL);
4610 rbd_dev_destroy(rbd_dev);
4612 /* release module ref */
4613 module_put(THIS_MODULE);
4616 static void __rbd_remove(struct rbd_device *rbd_dev)
4618 rbd_remove_all_snaps(rbd_dev);
4619 rbd_bus_del_dev(rbd_dev);
4622 static ssize_t rbd_remove(struct bus_type *bus,
4626 struct rbd_device *rbd_dev = NULL;
4631 rc = strict_strtoul(buf, 10, &ul);
4635 /* convert to int; abort if we lost anything in the conversion */
4636 target_id = (int) ul;
4637 if (target_id != ul)
4640 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4642 rbd_dev = __rbd_get_dev(target_id);
4648 spin_lock_irq(&rbd_dev->lock);
4649 if (rbd_dev->open_count)
4652 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4653 spin_unlock_irq(&rbd_dev->lock);
4657 while (rbd_dev->parent_spec) {
4658 struct rbd_device *first = rbd_dev;
4659 struct rbd_device *second = first->parent;
4660 struct rbd_device *third;
4663 * Follow to the parent with no grandparent and
4666 while (second && (third = second->parent)) {
4670 __rbd_remove(second);
4671 rbd_spec_put(first->parent_spec);
4672 first->parent_spec = NULL;
4673 first->parent_overlap = 0;
4674 first->parent = NULL;
4676 __rbd_remove(rbd_dev);
4679 mutex_unlock(&ctl_mutex);
4685 * create control files in sysfs
4688 static int rbd_sysfs_init(void)
4692 ret = device_register(&rbd_root_dev);
4696 ret = bus_register(&rbd_bus_type);
4698 device_unregister(&rbd_root_dev);
4703 static void rbd_sysfs_cleanup(void)
4705 bus_unregister(&rbd_bus_type);
4706 device_unregister(&rbd_root_dev);
4709 static int __init rbd_init(void)
4713 if (!libceph_compatible(NULL)) {
4714 rbd_warn(NULL, "libceph incompatibility (quitting)");
4718 rc = rbd_sysfs_init();
4721 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4725 static void __exit rbd_exit(void)
4727 rbd_sysfs_cleanup();
4730 module_init(rbd_init);
4731 module_exit(rbd_exit);
4733 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4734 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4735 MODULE_DESCRIPTION("rados block device");
4737 /* following authorship retained from original osdblk.c */
4738 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4740 MODULE_LICENSE("GPL");