2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
58 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
60 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN \
62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
64 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
66 #define RBD_SNAP_HEAD_NAME "-"
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX 64
72 #define RBD_OBJ_PREFIX_LEN_MAX 64
76 #define RBD_FEATURE_LAYERING (1<<0)
77 #define RBD_FEATURE_STRIPINGV2 (1<<1)
78 #define RBD_FEATURES_ALL \
79 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
81 /* Features supported by this (client software) implementation. */
83 #define RBD_FEATURES_SUPPORTED (0)
86 * An RBD device name will be "rbd#", where the "rbd" comes from
87 * RBD_DRV_NAME above, and # is a unique integer identifier.
88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89 * enough to hold all possible device names.
91 #define DEV_NAME_LEN 32
92 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
95 * block device image metadata (in-memory version)
97 struct rbd_image_header {
98 /* These four fields never change for a given rbd image */
105 /* The remaining fields need to be updated occasionally */
107 struct ceph_snap_context *snapc;
115 * An rbd image specification.
117 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118 * identify an image. Each rbd_dev structure includes a pointer to
119 * an rbd_spec structure that encapsulates this identity.
121 * Each of the id's in an rbd_spec has an associated name. For a
122 * user-mapped image, the names are supplied and the id's associated
123 * with them are looked up. For a layered image, a parent image is
124 * defined by the tuple, and the names are looked up.
126 * An rbd_dev structure contains a parent_spec pointer which is
127 * non-null if the image it represents is a child in a layered
128 * image. This pointer will refer to the rbd_spec structure used
129 * by the parent rbd_dev for its own identity (i.e., the structure
130 * is shared between the parent and child).
132 * Since these structures are populated once, during the discovery
133 * phase of image construction, they are effectively immutable so
134 * we make no effort to synchronize access to them.
136 * Note that code herein does not assume the image name is known (it
137 * could be a null pointer).
153 * an instance of the client. multiple devices may share an rbd client.
156 struct ceph_client *client;
158 struct list_head node;
161 struct rbd_img_request;
162 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
164 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
166 struct rbd_obj_request;
167 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
169 enum obj_request_type {
170 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
174 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
175 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
178 struct rbd_obj_request {
179 const char *object_name;
180 u64 offset; /* object start byte */
181 u64 length; /* bytes from offset */
184 struct rbd_img_request *img_request;
185 u64 img_offset; /* image relative offset */
186 struct list_head links; /* img_request->obj_requests */
187 u32 which; /* posn image request list */
189 enum obj_request_type type;
191 struct bio *bio_list;
198 struct ceph_osd_request *osd_req;
200 u64 xferred; /* bytes transferred */
204 rbd_obj_callback_t callback;
205 struct completion completion;
211 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
212 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
213 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
216 struct rbd_img_request {
217 struct rbd_device *rbd_dev;
218 u64 offset; /* starting image byte offset */
219 u64 length; /* byte count from offset */
222 u64 snap_id; /* for reads */
223 struct ceph_snap_context *snapc; /* for writes */
226 struct request *rq; /* block request */
227 struct rbd_obj_request *obj_request; /* obj req initiator */
229 spinlock_t completion_lock;/* protects next_completion */
231 rbd_img_callback_t callback;
232 u64 xferred;/* aggregate bytes transferred */
233 int result; /* first nonzero obj_request result */
235 u32 obj_request_count;
236 struct list_head obj_requests; /* rbd_obj_request structs */
241 #define for_each_obj_request(ireq, oreq) \
242 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
243 #define for_each_obj_request_from(ireq, oreq) \
244 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
245 #define for_each_obj_request_safe(ireq, oreq, n) \
246 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
252 struct list_head node;
267 int dev_id; /* blkdev unique id */
269 int major; /* blkdev assigned major */
270 struct gendisk *disk; /* blkdev's gendisk and rq */
272 u32 image_format; /* Either 1 or 2 */
273 struct rbd_client *rbd_client;
275 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
277 spinlock_t lock; /* queue, flags, open_count */
279 struct rbd_image_header header;
280 unsigned long flags; /* possibly lock protected */
281 struct rbd_spec *spec;
285 struct ceph_file_layout layout;
287 struct ceph_osd_event *watch_event;
288 struct rbd_obj_request *watch_request;
290 struct rbd_spec *parent_spec;
293 /* protects updating the header */
294 struct rw_semaphore header_rwsem;
296 struct rbd_mapping mapping;
298 struct list_head node;
300 /* list of snapshots */
301 struct list_head snaps;
305 unsigned long open_count; /* protected by lock */
309 * Flag bits for rbd_dev->flags. If atomicity is required,
310 * rbd_dev->lock is used to protect access.
312 * Currently, only the "removing" flag (which is coupled with the
313 * "open_count" field) requires atomic access.
316 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
317 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
320 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
322 static LIST_HEAD(rbd_dev_list); /* devices */
323 static DEFINE_SPINLOCK(rbd_dev_list_lock);
325 static LIST_HEAD(rbd_client_list); /* clients */
326 static DEFINE_SPINLOCK(rbd_client_list_lock);
328 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
329 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
331 static void rbd_dev_release(struct device *dev);
332 static void rbd_remove_snap_dev(struct rbd_snap *snap);
334 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
336 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
339 static struct bus_attribute rbd_bus_attrs[] = {
340 __ATTR(add, S_IWUSR, NULL, rbd_add),
341 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
345 static struct bus_type rbd_bus_type = {
347 .bus_attrs = rbd_bus_attrs,
350 static void rbd_root_dev_release(struct device *dev)
354 static struct device rbd_root_dev = {
356 .release = rbd_root_dev_release,
359 static __printf(2, 3)
360 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
362 struct va_format vaf;
370 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
371 else if (rbd_dev->disk)
372 printk(KERN_WARNING "%s: %s: %pV\n",
373 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
374 else if (rbd_dev->spec && rbd_dev->spec->image_name)
375 printk(KERN_WARNING "%s: image %s: %pV\n",
376 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
377 else if (rbd_dev->spec && rbd_dev->spec->image_id)
378 printk(KERN_WARNING "%s: id %s: %pV\n",
379 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
381 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
382 RBD_DRV_NAME, rbd_dev, &vaf);
387 #define rbd_assert(expr) \
388 if (unlikely(!(expr))) { \
389 printk(KERN_ERR "\nAssertion failure in %s() " \
391 "\trbd_assert(%s);\n\n", \
392 __func__, __LINE__, #expr); \
395 #else /* !RBD_DEBUG */
396 # define rbd_assert(expr) ((void) 0)
397 #endif /* !RBD_DEBUG */
399 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
400 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
402 static int rbd_open(struct block_device *bdev, fmode_t mode)
404 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
405 bool removing = false;
407 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
410 spin_lock_irq(&rbd_dev->lock);
411 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
414 rbd_dev->open_count++;
415 spin_unlock_irq(&rbd_dev->lock);
419 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
420 (void) get_device(&rbd_dev->dev);
421 set_device_ro(bdev, rbd_dev->mapping.read_only);
422 mutex_unlock(&ctl_mutex);
427 static int rbd_release(struct gendisk *disk, fmode_t mode)
429 struct rbd_device *rbd_dev = disk->private_data;
430 unsigned long open_count_before;
432 spin_lock_irq(&rbd_dev->lock);
433 open_count_before = rbd_dev->open_count--;
434 spin_unlock_irq(&rbd_dev->lock);
435 rbd_assert(open_count_before > 0);
437 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
438 put_device(&rbd_dev->dev);
439 mutex_unlock(&ctl_mutex);
444 static const struct block_device_operations rbd_bd_ops = {
445 .owner = THIS_MODULE,
447 .release = rbd_release,
451 * Initialize an rbd client instance.
454 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
456 struct rbd_client *rbdc;
459 dout("%s:\n", __func__);
460 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
464 kref_init(&rbdc->kref);
465 INIT_LIST_HEAD(&rbdc->node);
467 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
469 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
470 if (IS_ERR(rbdc->client))
472 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
474 ret = ceph_open_session(rbdc->client);
478 spin_lock(&rbd_client_list_lock);
479 list_add_tail(&rbdc->node, &rbd_client_list);
480 spin_unlock(&rbd_client_list_lock);
482 mutex_unlock(&ctl_mutex);
483 dout("%s: rbdc %p\n", __func__, rbdc);
488 ceph_destroy_client(rbdc->client);
490 mutex_unlock(&ctl_mutex);
494 ceph_destroy_options(ceph_opts);
495 dout("%s: error %d\n", __func__, ret);
501 * Find a ceph client with specific addr and configuration. If
502 * found, bump its reference count.
504 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
506 struct rbd_client *client_node;
509 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
512 spin_lock(&rbd_client_list_lock);
513 list_for_each_entry(client_node, &rbd_client_list, node) {
514 if (!ceph_compare_options(ceph_opts, client_node->client)) {
515 kref_get(&client_node->kref);
520 spin_unlock(&rbd_client_list_lock);
522 return found ? client_node : NULL;
532 /* string args above */
535 /* Boolean args above */
539 static match_table_t rbd_opts_tokens = {
541 /* string args above */
542 {Opt_read_only, "read_only"},
543 {Opt_read_only, "ro"}, /* Alternate spelling */
544 {Opt_read_write, "read_write"},
545 {Opt_read_write, "rw"}, /* Alternate spelling */
546 /* Boolean args above */
554 #define RBD_READ_ONLY_DEFAULT false
556 static int parse_rbd_opts_token(char *c, void *private)
558 struct rbd_options *rbd_opts = private;
559 substring_t argstr[MAX_OPT_ARGS];
560 int token, intval, ret;
562 token = match_token(c, rbd_opts_tokens, argstr);
566 if (token < Opt_last_int) {
567 ret = match_int(&argstr[0], &intval);
569 pr_err("bad mount option arg (not int) "
573 dout("got int token %d val %d\n", token, intval);
574 } else if (token > Opt_last_int && token < Opt_last_string) {
575 dout("got string token %d val %s\n", token,
577 } else if (token > Opt_last_string && token < Opt_last_bool) {
578 dout("got Boolean token %d\n", token);
580 dout("got token %d\n", token);
585 rbd_opts->read_only = true;
588 rbd_opts->read_only = false;
598 * Get a ceph client with specific addr and configuration, if one does
599 * not exist create it.
601 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
603 struct rbd_client *rbdc;
605 rbdc = rbd_client_find(ceph_opts);
606 if (rbdc) /* using an existing client */
607 ceph_destroy_options(ceph_opts);
609 rbdc = rbd_client_create(ceph_opts);
615 * Destroy ceph client
617 * Caller must hold rbd_client_list_lock.
619 static void rbd_client_release(struct kref *kref)
621 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
623 dout("%s: rbdc %p\n", __func__, rbdc);
624 spin_lock(&rbd_client_list_lock);
625 list_del(&rbdc->node);
626 spin_unlock(&rbd_client_list_lock);
628 ceph_destroy_client(rbdc->client);
633 * Drop reference to ceph client node. If it's not referenced anymore, release
636 static void rbd_put_client(struct rbd_client *rbdc)
639 kref_put(&rbdc->kref, rbd_client_release);
642 static bool rbd_image_format_valid(u32 image_format)
644 return image_format == 1 || image_format == 2;
647 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
652 /* The header has to start with the magic rbd header text */
653 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
656 /* The bio layer requires at least sector-sized I/O */
658 if (ondisk->options.order < SECTOR_SHIFT)
661 /* If we use u64 in a few spots we may be able to loosen this */
663 if (ondisk->options.order > 8 * sizeof (int) - 1)
667 * The size of a snapshot header has to fit in a size_t, and
668 * that limits the number of snapshots.
670 snap_count = le32_to_cpu(ondisk->snap_count);
671 size = SIZE_MAX - sizeof (struct ceph_snap_context);
672 if (snap_count > size / sizeof (__le64))
676 * Not only that, but the size of the entire the snapshot
677 * header must also be representable in a size_t.
679 size -= snap_count * sizeof (__le64);
680 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
687 * Create a new header structure, translate header format from the on-disk
690 static int rbd_header_from_disk(struct rbd_image_header *header,
691 struct rbd_image_header_ondisk *ondisk)
698 memset(header, 0, sizeof (*header));
700 snap_count = le32_to_cpu(ondisk->snap_count);
702 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
703 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
704 if (!header->object_prefix)
706 memcpy(header->object_prefix, ondisk->object_prefix, len);
707 header->object_prefix[len] = '\0';
710 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
712 /* Save a copy of the snapshot names */
714 if (snap_names_len > (u64) SIZE_MAX)
716 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
717 if (!header->snap_names)
720 * Note that rbd_dev_v1_header_read() guarantees
721 * the ondisk buffer we're working with has
722 * snap_names_len bytes beyond the end of the
723 * snapshot id array, this memcpy() is safe.
725 memcpy(header->snap_names, &ondisk->snaps[snap_count],
728 /* Record each snapshot's size */
730 size = snap_count * sizeof (*header->snap_sizes);
731 header->snap_sizes = kmalloc(size, GFP_KERNEL);
732 if (!header->snap_sizes)
734 for (i = 0; i < snap_count; i++)
735 header->snap_sizes[i] =
736 le64_to_cpu(ondisk->snaps[i].image_size);
738 WARN_ON(ondisk->snap_names_len);
739 header->snap_names = NULL;
740 header->snap_sizes = NULL;
743 header->features = 0; /* No features support in v1 images */
744 header->obj_order = ondisk->options.order;
745 header->crypt_type = ondisk->options.crypt_type;
746 header->comp_type = ondisk->options.comp_type;
748 /* Allocate and fill in the snapshot context */
750 header->image_size = le64_to_cpu(ondisk->image_size);
751 size = sizeof (struct ceph_snap_context);
752 size += snap_count * sizeof (header->snapc->snaps[0]);
753 header->snapc = kzalloc(size, GFP_KERNEL);
757 atomic_set(&header->snapc->nref, 1);
758 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
759 header->snapc->num_snaps = snap_count;
760 for (i = 0; i < snap_count; i++)
761 header->snapc->snaps[i] =
762 le64_to_cpu(ondisk->snaps[i].id);
767 kfree(header->snap_sizes);
768 header->snap_sizes = NULL;
769 kfree(header->snap_names);
770 header->snap_names = NULL;
771 kfree(header->object_prefix);
772 header->object_prefix = NULL;
777 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
779 struct rbd_snap *snap;
781 if (snap_id == CEPH_NOSNAP)
782 return RBD_SNAP_HEAD_NAME;
784 list_for_each_entry(snap, &rbd_dev->snaps, node)
785 if (snap_id == snap->id)
791 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
794 struct rbd_snap *snap;
796 list_for_each_entry(snap, &rbd_dev->snaps, node) {
797 if (!strcmp(snap_name, snap->name)) {
798 rbd_dev->spec->snap_id = snap->id;
799 rbd_dev->mapping.size = snap->size;
800 rbd_dev->mapping.features = snap->features;
809 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
813 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
814 sizeof (RBD_SNAP_HEAD_NAME))) {
815 rbd_dev->spec->snap_id = CEPH_NOSNAP;
816 rbd_dev->mapping.size = rbd_dev->header.image_size;
817 rbd_dev->mapping.features = rbd_dev->header.features;
820 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
823 rbd_dev->mapping.read_only = true;
825 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
831 static void rbd_header_free(struct rbd_image_header *header)
833 kfree(header->object_prefix);
834 header->object_prefix = NULL;
835 kfree(header->snap_sizes);
836 header->snap_sizes = NULL;
837 kfree(header->snap_names);
838 header->snap_names = NULL;
839 ceph_put_snap_context(header->snapc);
840 header->snapc = NULL;
843 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
849 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
852 segment = offset >> rbd_dev->header.obj_order;
853 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
854 rbd_dev->header.object_prefix, segment);
855 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
856 pr_err("error formatting segment name for #%llu (%d)\n",
865 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
867 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
869 return offset & (segment_size - 1);
872 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
873 u64 offset, u64 length)
875 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
877 offset &= segment_size - 1;
879 rbd_assert(length <= U64_MAX - offset);
880 if (offset + length > segment_size)
881 length = segment_size - offset;
887 * returns the size of an object in the image
889 static u64 rbd_obj_bytes(struct rbd_image_header *header)
891 return 1 << header->obj_order;
898 static void bio_chain_put(struct bio *chain)
904 chain = chain->bi_next;
910 * zeros a bio chain, starting at specific offset
912 static void zero_bio_chain(struct bio *chain, int start_ofs)
921 bio_for_each_segment(bv, chain, i) {
922 if (pos + bv->bv_len > start_ofs) {
923 int remainder = max(start_ofs - pos, 0);
924 buf = bvec_kmap_irq(bv, &flags);
925 memset(buf + remainder, 0,
926 bv->bv_len - remainder);
927 bvec_kunmap_irq(buf, &flags);
932 chain = chain->bi_next;
937 * Clone a portion of a bio, starting at the given byte offset
938 * and continuing for the number of bytes indicated.
940 static struct bio *bio_clone_range(struct bio *bio_src,
949 unsigned short end_idx;
953 /* Handle the easy case for the caller */
955 if (!offset && len == bio_src->bi_size)
956 return bio_clone(bio_src, gfpmask);
958 if (WARN_ON_ONCE(!len))
960 if (WARN_ON_ONCE(len > bio_src->bi_size))
962 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
965 /* Find first affected segment... */
968 __bio_for_each_segment(bv, bio_src, idx, 0) {
969 if (resid < bv->bv_len)
975 /* ...and the last affected segment */
978 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
979 if (resid <= bv->bv_len)
983 vcnt = end_idx - idx + 1;
985 /* Build the clone */
987 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
989 return NULL; /* ENOMEM */
991 bio->bi_bdev = bio_src->bi_bdev;
992 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
993 bio->bi_rw = bio_src->bi_rw;
994 bio->bi_flags |= 1 << BIO_CLONED;
997 * Copy over our part of the bio_vec, then update the first
998 * and last (or only) entries.
1000 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1001 vcnt * sizeof (struct bio_vec));
1002 bio->bi_io_vec[0].bv_offset += voff;
1004 bio->bi_io_vec[0].bv_len -= voff;
1005 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1007 bio->bi_io_vec[0].bv_len = len;
1010 bio->bi_vcnt = vcnt;
1018 * Clone a portion of a bio chain, starting at the given byte offset
1019 * into the first bio in the source chain and continuing for the
1020 * number of bytes indicated. The result is another bio chain of
1021 * exactly the given length, or a null pointer on error.
1023 * The bio_src and offset parameters are both in-out. On entry they
1024 * refer to the first source bio and the offset into that bio where
1025 * the start of data to be cloned is located.
1027 * On return, bio_src is updated to refer to the bio in the source
1028 * chain that contains first un-cloned byte, and *offset will
1029 * contain the offset of that byte within that bio.
1031 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1032 unsigned int *offset,
1036 struct bio *bi = *bio_src;
1037 unsigned int off = *offset;
1038 struct bio *chain = NULL;
1041 /* Build up a chain of clone bios up to the limit */
1043 if (!bi || off >= bi->bi_size || !len)
1044 return NULL; /* Nothing to clone */
1048 unsigned int bi_size;
1052 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1053 goto out_err; /* EINVAL; ran out of bio's */
1055 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1056 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1058 goto out_err; /* ENOMEM */
1061 end = &bio->bi_next;
1064 if (off == bi->bi_size) {
1075 bio_chain_put(chain);
1081 * The default/initial value for all object request flags is 0. For
1082 * each flag, once its value is set to 1 it is never reset to 0
1085 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1087 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1088 struct rbd_img_request *img_request = obj_request->img_request;
1089 struct rbd_device *rbd_dev;
1091 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1092 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1097 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1100 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1103 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1105 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1106 struct rbd_img_request *img_request = obj_request->img_request;
1107 struct rbd_device *rbd_dev;
1109 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1110 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1115 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1118 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1121 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1123 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1124 atomic_read(&obj_request->kref.refcount));
1125 kref_get(&obj_request->kref);
1128 static void rbd_obj_request_destroy(struct kref *kref);
1129 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1131 rbd_assert(obj_request != NULL);
1132 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1133 atomic_read(&obj_request->kref.refcount));
1134 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1137 static void rbd_img_request_get(struct rbd_img_request *img_request)
1139 dout("%s: img %p (was %d)\n", __func__, img_request,
1140 atomic_read(&img_request->kref.refcount));
1141 kref_get(&img_request->kref);
1144 static void rbd_img_request_destroy(struct kref *kref);
1145 static void rbd_img_request_put(struct rbd_img_request *img_request)
1147 rbd_assert(img_request != NULL);
1148 dout("%s: img %p (was %d)\n", __func__, img_request,
1149 atomic_read(&img_request->kref.refcount));
1150 kref_put(&img_request->kref, rbd_img_request_destroy);
1153 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1154 struct rbd_obj_request *obj_request)
1156 rbd_assert(obj_request->img_request == NULL);
1158 rbd_obj_request_get(obj_request);
1159 obj_request->img_request = img_request;
1160 obj_request->which = img_request->obj_request_count;
1161 rbd_assert(!obj_request_img_data_test(obj_request));
1162 obj_request_img_data_set(obj_request);
1163 rbd_assert(obj_request->which != BAD_WHICH);
1164 img_request->obj_request_count++;
1165 list_add_tail(&obj_request->links, &img_request->obj_requests);
1166 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1167 obj_request->which);
1170 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1171 struct rbd_obj_request *obj_request)
1173 rbd_assert(obj_request->which != BAD_WHICH);
1175 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1176 obj_request->which);
1177 list_del(&obj_request->links);
1178 rbd_assert(img_request->obj_request_count > 0);
1179 img_request->obj_request_count--;
1180 rbd_assert(obj_request->which == img_request->obj_request_count);
1181 obj_request->which = BAD_WHICH;
1182 rbd_assert(obj_request_img_data_test(obj_request));
1183 rbd_assert(obj_request->img_request == img_request);
1184 obj_request->img_request = NULL;
1185 obj_request->callback = NULL;
1186 rbd_obj_request_put(obj_request);
1189 static bool obj_request_type_valid(enum obj_request_type type)
1192 case OBJ_REQUEST_NODATA:
1193 case OBJ_REQUEST_BIO:
1194 case OBJ_REQUEST_PAGES:
1201 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1202 struct rbd_obj_request *obj_request)
1204 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1206 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1209 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1212 dout("%s: img %p\n", __func__, img_request);
1215 * If no error occurred, compute the aggregate transfer
1216 * count for the image request. We could instead use
1217 * atomic64_cmpxchg() to update it as each object request
1218 * completes; not clear which way is better off hand.
1220 if (!img_request->result) {
1221 struct rbd_obj_request *obj_request;
1224 for_each_obj_request(img_request, obj_request)
1225 xferred += obj_request->xferred;
1226 img_request->xferred = xferred;
1229 if (img_request->callback)
1230 img_request->callback(img_request);
1232 rbd_img_request_put(img_request);
1235 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1237 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1239 dout("%s: obj %p\n", __func__, obj_request);
1241 return wait_for_completion_interruptible(&obj_request->completion);
1245 * The default/initial value for all image request flags is 0. Each
1246 * is conditionally set to 1 at image request initialization time
1247 * and currently never change thereafter.
1249 static void img_request_write_set(struct rbd_img_request *img_request)
1251 set_bit(IMG_REQ_WRITE, &img_request->flags);
1255 static bool img_request_write_test(struct rbd_img_request *img_request)
1258 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1261 static void img_request_child_set(struct rbd_img_request *img_request)
1263 set_bit(IMG_REQ_CHILD, &img_request->flags);
1267 static bool img_request_child_test(struct rbd_img_request *img_request)
1270 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1273 static void img_request_layered_set(struct rbd_img_request *img_request)
1275 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1279 static bool img_request_layered_test(struct rbd_img_request *img_request)
1282 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1286 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1288 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1289 obj_request, obj_request->img_request, obj_request->result,
1290 obj_request->xferred, obj_request->length);
1292 * ENOENT means a hole in the image. We zero-fill the
1293 * entire length of the request. A short read also implies
1294 * zero-fill to the end of the request. Either way we
1295 * update the xferred count to indicate the whole request
1298 BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1299 if (obj_request->result == -ENOENT) {
1300 zero_bio_chain(obj_request->bio_list, 0);
1301 obj_request->result = 0;
1302 obj_request->xferred = obj_request->length;
1303 } else if (obj_request->xferred < obj_request->length &&
1304 !obj_request->result) {
1305 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1306 obj_request->xferred = obj_request->length;
1308 obj_request_done_set(obj_request);
1311 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1313 dout("%s: obj %p cb %p\n", __func__, obj_request,
1314 obj_request->callback);
1315 if (obj_request->callback)
1316 obj_request->callback(obj_request);
1318 complete_all(&obj_request->completion);
1321 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1323 dout("%s: obj %p\n", __func__, obj_request);
1324 obj_request_done_set(obj_request);
1327 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1329 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1330 obj_request->result, obj_request->xferred, obj_request->length);
1331 if (obj_request->img_request)
1332 rbd_img_obj_request_read_callback(obj_request);
1334 obj_request_done_set(obj_request);
1337 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1339 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1340 obj_request->result, obj_request->length);
1342 * There is no such thing as a successful short write.
1343 * Our xferred value is the number of bytes transferred
1344 * back. Set it to our originally-requested length.
1346 obj_request->xferred = obj_request->length;
1347 obj_request_done_set(obj_request);
1351 * For a simple stat call there's nothing to do. We'll do more if
1352 * this is part of a write sequence for a layered image.
1354 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1356 dout("%s: obj %p\n", __func__, obj_request);
1357 obj_request_done_set(obj_request);
1360 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1361 struct ceph_msg *msg)
1363 struct rbd_obj_request *obj_request = osd_req->r_priv;
1366 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1367 rbd_assert(osd_req == obj_request->osd_req);
1368 rbd_assert(obj_request_img_data_test(obj_request) ^
1369 !obj_request->img_request);
1370 rbd_assert(obj_request_img_data_test(obj_request) ^
1371 (obj_request->which == BAD_WHICH));
1373 if (osd_req->r_result < 0)
1374 obj_request->result = osd_req->r_result;
1375 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1377 WARN_ON(osd_req->r_num_ops != 1); /* For now */
1380 * We support a 64-bit length, but ultimately it has to be
1381 * passed to blk_end_request(), which takes an unsigned int.
1383 obj_request->xferred = osd_req->r_reply_op_len[0];
1384 rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1385 opcode = osd_req->r_ops[0].op;
1387 case CEPH_OSD_OP_READ:
1388 rbd_osd_read_callback(obj_request);
1390 case CEPH_OSD_OP_WRITE:
1391 rbd_osd_write_callback(obj_request);
1393 case CEPH_OSD_OP_STAT:
1394 rbd_osd_stat_callback(obj_request);
1396 case CEPH_OSD_OP_CALL:
1397 case CEPH_OSD_OP_NOTIFY_ACK:
1398 case CEPH_OSD_OP_WATCH:
1399 rbd_osd_trivial_callback(obj_request);
1402 rbd_warn(NULL, "%s: unsupported op %hu\n",
1403 obj_request->object_name, (unsigned short) opcode);
1407 if (obj_request_done_test(obj_request))
1408 rbd_obj_request_complete(obj_request);
1411 static void rbd_osd_req_format(struct rbd_obj_request *obj_request,
1414 struct rbd_img_request *img_request = obj_request->img_request;
1415 struct ceph_osd_request *osd_req = obj_request->osd_req;
1416 struct ceph_snap_context *snapc = NULL;
1417 u64 snap_id = CEPH_NOSNAP;
1418 struct timespec *mtime = NULL;
1419 struct timespec now;
1421 rbd_assert(osd_req != NULL);
1423 if (write_request) {
1427 snapc = img_request->snapc;
1428 } else if (img_request) {
1429 snap_id = img_request->snap_id;
1431 ceph_osdc_build_request(osd_req, obj_request->offset,
1432 snapc, snap_id, mtime);
1435 static struct ceph_osd_request *rbd_osd_req_create(
1436 struct rbd_device *rbd_dev,
1438 struct rbd_obj_request *obj_request)
1440 struct ceph_snap_context *snapc = NULL;
1441 struct ceph_osd_client *osdc;
1442 struct ceph_osd_request *osd_req;
1444 if (obj_request_img_data_test(obj_request)) {
1445 struct rbd_img_request *img_request = obj_request->img_request;
1447 rbd_assert(write_request ==
1448 img_request_write_test(img_request));
1450 snapc = img_request->snapc;
1453 /* Allocate and initialize the request, for the single op */
1455 osdc = &rbd_dev->rbd_client->client->osdc;
1456 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1458 return NULL; /* ENOMEM */
1461 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1463 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1465 osd_req->r_callback = rbd_osd_req_callback;
1466 osd_req->r_priv = obj_request;
1468 osd_req->r_oid_len = strlen(obj_request->object_name);
1469 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1470 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1472 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1477 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1479 ceph_osdc_put_request(osd_req);
1482 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1484 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1485 u64 offset, u64 length,
1486 enum obj_request_type type)
1488 struct rbd_obj_request *obj_request;
1492 rbd_assert(obj_request_type_valid(type));
1494 size = strlen(object_name) + 1;
1495 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1499 name = (char *)(obj_request + 1);
1500 obj_request->object_name = memcpy(name, object_name, size);
1501 obj_request->offset = offset;
1502 obj_request->length = length;
1503 obj_request->flags = 0;
1504 obj_request->which = BAD_WHICH;
1505 obj_request->type = type;
1506 INIT_LIST_HEAD(&obj_request->links);
1507 init_completion(&obj_request->completion);
1508 kref_init(&obj_request->kref);
1510 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1511 offset, length, (int)type, obj_request);
1516 static void rbd_obj_request_destroy(struct kref *kref)
1518 struct rbd_obj_request *obj_request;
1520 obj_request = container_of(kref, struct rbd_obj_request, kref);
1522 dout("%s: obj %p\n", __func__, obj_request);
1524 rbd_assert(obj_request->img_request == NULL);
1525 rbd_assert(obj_request->which == BAD_WHICH);
1527 if (obj_request->osd_req)
1528 rbd_osd_req_destroy(obj_request->osd_req);
1530 rbd_assert(obj_request_type_valid(obj_request->type));
1531 switch (obj_request->type) {
1532 case OBJ_REQUEST_NODATA:
1533 break; /* Nothing to do */
1534 case OBJ_REQUEST_BIO:
1535 if (obj_request->bio_list)
1536 bio_chain_put(obj_request->bio_list);
1538 case OBJ_REQUEST_PAGES:
1539 if (obj_request->pages)
1540 ceph_release_page_vector(obj_request->pages,
1541 obj_request->page_count);
1549 * Caller is responsible for filling in the list of object requests
1550 * that comprises the image request, and the Linux request pointer
1551 * (if there is one).
1553 static struct rbd_img_request *rbd_img_request_create(
1554 struct rbd_device *rbd_dev,
1555 u64 offset, u64 length,
1559 struct rbd_img_request *img_request;
1560 struct ceph_snap_context *snapc = NULL;
1562 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1566 if (write_request) {
1567 down_read(&rbd_dev->header_rwsem);
1568 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1569 up_read(&rbd_dev->header_rwsem);
1570 if (WARN_ON(!snapc)) {
1572 return NULL; /* Shouldn't happen */
1577 img_request->rq = NULL;
1578 img_request->rbd_dev = rbd_dev;
1579 img_request->offset = offset;
1580 img_request->length = length;
1581 img_request->flags = 0;
1582 if (write_request) {
1583 img_request_write_set(img_request);
1584 img_request->snapc = snapc;
1586 img_request->snap_id = rbd_dev->spec->snap_id;
1589 img_request_child_set(img_request);
1590 if (rbd_dev->parent_spec)
1591 img_request_layered_set(img_request);
1592 spin_lock_init(&img_request->completion_lock);
1593 img_request->next_completion = 0;
1594 img_request->callback = NULL;
1595 img_request->result = 0;
1596 img_request->obj_request_count = 0;
1597 INIT_LIST_HEAD(&img_request->obj_requests);
1598 kref_init(&img_request->kref);
1600 (void) img_request_layered_test(img_request); /* Avoid a warning */
1601 rbd_img_request_get(img_request); /* Avoid a warning */
1602 rbd_img_request_put(img_request); /* TEMPORARY */
1604 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1605 write_request ? "write" : "read", offset, length,
1611 static void rbd_img_request_destroy(struct kref *kref)
1613 struct rbd_img_request *img_request;
1614 struct rbd_obj_request *obj_request;
1615 struct rbd_obj_request *next_obj_request;
1617 img_request = container_of(kref, struct rbd_img_request, kref);
1619 dout("%s: img %p\n", __func__, img_request);
1621 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1622 rbd_img_obj_request_del(img_request, obj_request);
1623 rbd_assert(img_request->obj_request_count == 0);
1625 if (img_request_write_test(img_request))
1626 ceph_put_snap_context(img_request->snapc);
1631 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1633 struct rbd_img_request *img_request;
1634 unsigned int xferred;
1637 rbd_assert(obj_request_img_data_test(obj_request));
1638 img_request = obj_request->img_request;
1640 rbd_assert(!img_request_child_test(img_request));
1641 rbd_assert(img_request->rq != NULL);
1643 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1644 xferred = (unsigned int)obj_request->xferred;
1645 result = obj_request->result;
1647 struct rbd_device *rbd_dev = img_request->rbd_dev;
1649 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1650 img_request_write_test(img_request) ? "write" : "read",
1651 obj_request->length, obj_request->img_offset,
1652 obj_request->offset);
1653 rbd_warn(rbd_dev, " result %d xferred %x\n",
1655 if (!img_request->result)
1656 img_request->result = result;
1659 return blk_end_request(img_request->rq, result, xferred);
1662 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1664 struct rbd_img_request *img_request;
1665 u32 which = obj_request->which;
1668 rbd_assert(obj_request_img_data_test(obj_request));
1669 img_request = obj_request->img_request;
1671 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1672 rbd_assert(img_request != NULL);
1673 rbd_assert(img_request->obj_request_count > 0);
1674 rbd_assert(which != BAD_WHICH);
1675 rbd_assert(which < img_request->obj_request_count);
1676 rbd_assert(which >= img_request->next_completion);
1678 spin_lock_irq(&img_request->completion_lock);
1679 if (which != img_request->next_completion)
1682 for_each_obj_request_from(img_request, obj_request) {
1684 rbd_assert(which < img_request->obj_request_count);
1686 if (!obj_request_done_test(obj_request))
1688 more = rbd_img_obj_end_request(obj_request);
1692 rbd_assert(more ^ (which == img_request->obj_request_count));
1693 img_request->next_completion = which;
1695 spin_unlock_irq(&img_request->completion_lock);
1698 rbd_img_request_complete(img_request);
1701 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1702 struct bio *bio_list)
1704 struct rbd_device *rbd_dev = img_request->rbd_dev;
1705 struct rbd_obj_request *obj_request = NULL;
1706 struct rbd_obj_request *next_obj_request;
1707 bool write_request = img_request_write_test(img_request);
1708 unsigned int bio_offset;
1713 dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1715 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1717 img_offset = img_request->offset;
1718 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1719 resid = img_request->length;
1720 rbd_assert(resid > 0);
1722 struct ceph_osd_request *osd_req;
1723 const char *object_name;
1724 unsigned int clone_size;
1728 object_name = rbd_segment_name(rbd_dev, img_offset);
1731 offset = rbd_segment_offset(rbd_dev, img_offset);
1732 length = rbd_segment_length(rbd_dev, img_offset, resid);
1733 obj_request = rbd_obj_request_create(object_name,
1736 kfree(object_name); /* object request has its own copy */
1740 rbd_assert(length <= (u64) UINT_MAX);
1741 clone_size = (unsigned int) length;
1742 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1743 &bio_offset, clone_size,
1745 if (!obj_request->bio_list)
1748 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1752 obj_request->osd_req = osd_req;
1753 obj_request->callback = rbd_img_obj_callback;
1755 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1757 osd_req_op_extent_osd_data_bio(osd_req, 0, write_request,
1758 obj_request->bio_list, obj_request->length);
1759 rbd_osd_req_format(obj_request, write_request);
1761 obj_request->img_offset = img_offset;
1762 rbd_img_obj_request_add(img_request, obj_request);
1764 img_offset += length;
1771 rbd_obj_request_put(obj_request);
1773 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1774 rbd_obj_request_put(obj_request);
1779 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1781 struct rbd_device *rbd_dev = img_request->rbd_dev;
1782 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1783 struct rbd_obj_request *obj_request;
1784 struct rbd_obj_request *next_obj_request;
1786 dout("%s: img %p\n", __func__, img_request);
1787 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
1790 ret = rbd_obj_request_submit(osdc, obj_request);
1794 * The image request has its own reference to each
1795 * of its object requests, so we can safely drop the
1798 rbd_obj_request_put(obj_request);
1804 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1805 u64 ver, u64 notify_id)
1807 struct rbd_obj_request *obj_request;
1808 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1811 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1812 OBJ_REQUEST_NODATA);
1817 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1818 if (!obj_request->osd_req)
1820 obj_request->callback = rbd_obj_request_put;
1822 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
1824 rbd_osd_req_format(obj_request, false);
1826 ret = rbd_obj_request_submit(osdc, obj_request);
1829 rbd_obj_request_put(obj_request);
1834 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1836 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1843 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1844 rbd_dev->header_name, (unsigned long long) notify_id,
1845 (unsigned int) opcode);
1846 rc = rbd_dev_refresh(rbd_dev, &hver);
1848 rbd_warn(rbd_dev, "got notification but failed to "
1849 " update snaps: %d\n", rc);
1851 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1855 * Request sync osd watch/unwatch. The value of "start" determines
1856 * whether a watch request is being initiated or torn down.
1858 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1860 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1861 struct rbd_obj_request *obj_request;
1864 rbd_assert(start ^ !!rbd_dev->watch_event);
1865 rbd_assert(start ^ !!rbd_dev->watch_request);
1868 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1869 &rbd_dev->watch_event);
1872 rbd_assert(rbd_dev->watch_event != NULL);
1876 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1877 OBJ_REQUEST_NODATA);
1881 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
1882 if (!obj_request->osd_req)
1886 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1888 ceph_osdc_unregister_linger_request(osdc,
1889 rbd_dev->watch_request->osd_req);
1891 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1892 rbd_dev->watch_event->cookie,
1893 rbd_dev->header.obj_version, start);
1894 rbd_osd_req_format(obj_request, true);
1896 ret = rbd_obj_request_submit(osdc, obj_request);
1899 ret = rbd_obj_request_wait(obj_request);
1902 ret = obj_request->result;
1907 * A watch request is set to linger, so the underlying osd
1908 * request won't go away until we unregister it. We retain
1909 * a pointer to the object request during that time (in
1910 * rbd_dev->watch_request), so we'll keep a reference to
1911 * it. We'll drop that reference (below) after we've
1915 rbd_dev->watch_request = obj_request;
1920 /* We have successfully torn down the watch request */
1922 rbd_obj_request_put(rbd_dev->watch_request);
1923 rbd_dev->watch_request = NULL;
1925 /* Cancel the event if we're tearing down, or on error */
1926 ceph_osdc_cancel_event(rbd_dev->watch_event);
1927 rbd_dev->watch_event = NULL;
1929 rbd_obj_request_put(obj_request);
1935 * Synchronous osd object method call
1937 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1938 const char *object_name,
1939 const char *class_name,
1940 const char *method_name,
1941 const char *outbound,
1942 size_t outbound_size,
1944 size_t inbound_size,
1947 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1948 struct rbd_obj_request *obj_request;
1949 struct page **pages;
1954 * Method calls are ultimately read operations. The result
1955 * should placed into the inbound buffer provided. They
1956 * also supply outbound data--parameters for the object
1957 * method. Currently if this is present it will be a
1960 page_count = (u32) calc_pages_for(0, inbound_size);
1961 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1963 return PTR_ERR(pages);
1966 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
1971 obj_request->pages = pages;
1972 obj_request->page_count = page_count;
1974 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1975 if (!obj_request->osd_req)
1978 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
1979 class_name, method_name);
1980 if (outbound_size) {
1981 struct ceph_pagelist *pagelist;
1983 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
1987 ceph_pagelist_init(pagelist);
1988 ceph_pagelist_append(pagelist, outbound, outbound_size);
1989 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
1992 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
1993 obj_request->pages, inbound_size,
1995 rbd_osd_req_format(obj_request, false);
1997 ret = rbd_obj_request_submit(osdc, obj_request);
2000 ret = rbd_obj_request_wait(obj_request);
2004 ret = obj_request->result;
2008 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2010 *version = obj_request->version;
2013 rbd_obj_request_put(obj_request);
2015 ceph_release_page_vector(pages, page_count);
2020 static void rbd_request_fn(struct request_queue *q)
2021 __releases(q->queue_lock) __acquires(q->queue_lock)
2023 struct rbd_device *rbd_dev = q->queuedata;
2024 bool read_only = rbd_dev->mapping.read_only;
2028 while ((rq = blk_fetch_request(q))) {
2029 bool write_request = rq_data_dir(rq) == WRITE;
2030 struct rbd_img_request *img_request;
2034 /* Ignore any non-FS requests that filter through. */
2036 if (rq->cmd_type != REQ_TYPE_FS) {
2037 dout("%s: non-fs request type %d\n", __func__,
2038 (int) rq->cmd_type);
2039 __blk_end_request_all(rq, 0);
2043 /* Ignore/skip any zero-length requests */
2045 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2046 length = (u64) blk_rq_bytes(rq);
2049 dout("%s: zero-length request\n", __func__);
2050 __blk_end_request_all(rq, 0);
2054 spin_unlock_irq(q->queue_lock);
2056 /* Disallow writes to a read-only device */
2058 if (write_request) {
2062 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2066 * Quit early if the mapped snapshot no longer
2067 * exists. It's still possible the snapshot will
2068 * have disappeared by the time our request arrives
2069 * at the osd, but there's no sense in sending it if
2072 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2073 dout("request for non-existent snapshot");
2074 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2080 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2081 goto end_request; /* Shouldn't happen */
2084 img_request = rbd_img_request_create(rbd_dev, offset, length,
2085 write_request, false);
2089 img_request->rq = rq;
2091 result = rbd_img_request_fill_bio(img_request, rq->bio);
2093 result = rbd_img_request_submit(img_request);
2095 rbd_img_request_put(img_request);
2097 spin_lock_irq(q->queue_lock);
2099 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2100 write_request ? "write" : "read",
2101 length, offset, result);
2103 __blk_end_request_all(rq, result);
2109 * a queue callback. Makes sure that we don't create a bio that spans across
2110 * multiple osd objects. One exception would be with a single page bios,
2111 * which we handle later at bio_chain_clone_range()
2113 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2114 struct bio_vec *bvec)
2116 struct rbd_device *rbd_dev = q->queuedata;
2117 sector_t sector_offset;
2118 sector_t sectors_per_obj;
2119 sector_t obj_sector_offset;
2123 * Find how far into its rbd object the partition-relative
2124 * bio start sector is to offset relative to the enclosing
2127 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2128 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2129 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2132 * Compute the number of bytes from that offset to the end
2133 * of the object. Account for what's already used by the bio.
2135 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2136 if (ret > bmd->bi_size)
2137 ret -= bmd->bi_size;
2142 * Don't send back more than was asked for. And if the bio
2143 * was empty, let the whole thing through because: "Note
2144 * that a block device *must* allow a single page to be
2145 * added to an empty bio."
2147 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2148 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2149 ret = (int) bvec->bv_len;
2154 static void rbd_free_disk(struct rbd_device *rbd_dev)
2156 struct gendisk *disk = rbd_dev->disk;
2161 if (disk->flags & GENHD_FL_UP)
2164 blk_cleanup_queue(disk->queue);
2168 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2169 const char *object_name,
2170 u64 offset, u64 length,
2171 char *buf, u64 *version)
2174 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2175 struct rbd_obj_request *obj_request;
2176 struct page **pages = NULL;
2181 page_count = (u32) calc_pages_for(offset, length);
2182 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2184 ret = PTR_ERR(pages);
2187 obj_request = rbd_obj_request_create(object_name, offset, length,
2192 obj_request->pages = pages;
2193 obj_request->page_count = page_count;
2195 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2196 if (!obj_request->osd_req)
2199 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2200 offset, length, 0, 0);
2201 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, false,
2203 obj_request->length,
2204 obj_request->offset & ~PAGE_MASK,
2206 rbd_osd_req_format(obj_request, false);
2208 ret = rbd_obj_request_submit(osdc, obj_request);
2211 ret = rbd_obj_request_wait(obj_request);
2215 ret = obj_request->result;
2219 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2220 size = (size_t) obj_request->xferred;
2221 ceph_copy_from_page_vector(pages, buf, 0, size);
2222 rbd_assert(size <= (size_t) INT_MAX);
2225 *version = obj_request->version;
2228 rbd_obj_request_put(obj_request);
2230 ceph_release_page_vector(pages, page_count);
2236 * Read the complete header for the given rbd device.
2238 * Returns a pointer to a dynamically-allocated buffer containing
2239 * the complete and validated header. Caller can pass the address
2240 * of a variable that will be filled in with the version of the
2241 * header object at the time it was read.
2243 * Returns a pointer-coded errno if a failure occurs.
2245 static struct rbd_image_header_ondisk *
2246 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2248 struct rbd_image_header_ondisk *ondisk = NULL;
2255 * The complete header will include an array of its 64-bit
2256 * snapshot ids, followed by the names of those snapshots as
2257 * a contiguous block of NUL-terminated strings. Note that
2258 * the number of snapshots could change by the time we read
2259 * it in, in which case we re-read it.
2266 size = sizeof (*ondisk);
2267 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2269 ondisk = kmalloc(size, GFP_KERNEL);
2271 return ERR_PTR(-ENOMEM);
2273 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2275 (char *) ondisk, version);
2278 if (WARN_ON((size_t) ret < size)) {
2280 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2284 if (!rbd_dev_ondisk_valid(ondisk)) {
2286 rbd_warn(rbd_dev, "invalid header");
2290 names_size = le64_to_cpu(ondisk->snap_names_len);
2291 want_count = snap_count;
2292 snap_count = le32_to_cpu(ondisk->snap_count);
2293 } while (snap_count != want_count);
2300 return ERR_PTR(ret);
2304 * reload the ondisk the header
2306 static int rbd_read_header(struct rbd_device *rbd_dev,
2307 struct rbd_image_header *header)
2309 struct rbd_image_header_ondisk *ondisk;
2313 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2315 return PTR_ERR(ondisk);
2316 ret = rbd_header_from_disk(header, ondisk);
2318 header->obj_version = ver;
2324 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2326 struct rbd_snap *snap;
2327 struct rbd_snap *next;
2329 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2330 rbd_remove_snap_dev(snap);
2333 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2337 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2340 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2341 dout("setting size to %llu sectors", (unsigned long long) size);
2342 rbd_dev->mapping.size = (u64) size;
2343 set_capacity(rbd_dev->disk, size);
2347 * only read the first part of the ondisk header, without the snaps info
2349 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2352 struct rbd_image_header h;
2354 ret = rbd_read_header(rbd_dev, &h);
2358 down_write(&rbd_dev->header_rwsem);
2360 /* Update image size, and check for resize of mapped image */
2361 rbd_dev->header.image_size = h.image_size;
2362 rbd_update_mapping_size(rbd_dev);
2364 /* rbd_dev->header.object_prefix shouldn't change */
2365 kfree(rbd_dev->header.snap_sizes);
2366 kfree(rbd_dev->header.snap_names);
2367 /* osd requests may still refer to snapc */
2368 ceph_put_snap_context(rbd_dev->header.snapc);
2371 *hver = h.obj_version;
2372 rbd_dev->header.obj_version = h.obj_version;
2373 rbd_dev->header.image_size = h.image_size;
2374 rbd_dev->header.snapc = h.snapc;
2375 rbd_dev->header.snap_names = h.snap_names;
2376 rbd_dev->header.snap_sizes = h.snap_sizes;
2377 /* Free the extra copy of the object prefix */
2378 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2379 kfree(h.object_prefix);
2381 ret = rbd_dev_snaps_update(rbd_dev);
2383 ret = rbd_dev_snaps_register(rbd_dev);
2385 up_write(&rbd_dev->header_rwsem);
2390 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2394 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2395 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2396 if (rbd_dev->image_format == 1)
2397 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2399 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2400 mutex_unlock(&ctl_mutex);
2405 static int rbd_init_disk(struct rbd_device *rbd_dev)
2407 struct gendisk *disk;
2408 struct request_queue *q;
2411 /* create gendisk info */
2412 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2416 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2418 disk->major = rbd_dev->major;
2419 disk->first_minor = 0;
2420 disk->fops = &rbd_bd_ops;
2421 disk->private_data = rbd_dev;
2423 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2427 /* We use the default size, but let's be explicit about it. */
2428 blk_queue_physical_block_size(q, SECTOR_SIZE);
2430 /* set io sizes to object size */
2431 segment_size = rbd_obj_bytes(&rbd_dev->header);
2432 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2433 blk_queue_max_segment_size(q, segment_size);
2434 blk_queue_io_min(q, segment_size);
2435 blk_queue_io_opt(q, segment_size);
2437 blk_queue_merge_bvec(q, rbd_merge_bvec);
2440 q->queuedata = rbd_dev;
2442 rbd_dev->disk = disk;
2444 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2457 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2459 return container_of(dev, struct rbd_device, dev);
2462 static ssize_t rbd_size_show(struct device *dev,
2463 struct device_attribute *attr, char *buf)
2465 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2468 down_read(&rbd_dev->header_rwsem);
2469 size = get_capacity(rbd_dev->disk);
2470 up_read(&rbd_dev->header_rwsem);
2472 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2476 * Note this shows the features for whatever's mapped, which is not
2477 * necessarily the base image.
2479 static ssize_t rbd_features_show(struct device *dev,
2480 struct device_attribute *attr, char *buf)
2482 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2484 return sprintf(buf, "0x%016llx\n",
2485 (unsigned long long) rbd_dev->mapping.features);
2488 static ssize_t rbd_major_show(struct device *dev,
2489 struct device_attribute *attr, char *buf)
2491 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2493 return sprintf(buf, "%d\n", rbd_dev->major);
2496 static ssize_t rbd_client_id_show(struct device *dev,
2497 struct device_attribute *attr, char *buf)
2499 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2501 return sprintf(buf, "client%lld\n",
2502 ceph_client_id(rbd_dev->rbd_client->client));
2505 static ssize_t rbd_pool_show(struct device *dev,
2506 struct device_attribute *attr, char *buf)
2508 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2510 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2513 static ssize_t rbd_pool_id_show(struct device *dev,
2514 struct device_attribute *attr, char *buf)
2516 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2518 return sprintf(buf, "%llu\n",
2519 (unsigned long long) rbd_dev->spec->pool_id);
2522 static ssize_t rbd_name_show(struct device *dev,
2523 struct device_attribute *attr, char *buf)
2525 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2527 if (rbd_dev->spec->image_name)
2528 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2530 return sprintf(buf, "(unknown)\n");
2533 static ssize_t rbd_image_id_show(struct device *dev,
2534 struct device_attribute *attr, char *buf)
2536 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2538 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2542 * Shows the name of the currently-mapped snapshot (or
2543 * RBD_SNAP_HEAD_NAME for the base image).
2545 static ssize_t rbd_snap_show(struct device *dev,
2546 struct device_attribute *attr,
2549 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2551 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2555 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2556 * for the parent image. If there is no parent, simply shows
2557 * "(no parent image)".
2559 static ssize_t rbd_parent_show(struct device *dev,
2560 struct device_attribute *attr,
2563 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2564 struct rbd_spec *spec = rbd_dev->parent_spec;
2569 return sprintf(buf, "(no parent image)\n");
2571 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2572 (unsigned long long) spec->pool_id, spec->pool_name);
2577 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2578 spec->image_name ? spec->image_name : "(unknown)");
2583 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2584 (unsigned long long) spec->snap_id, spec->snap_name);
2589 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2594 return (ssize_t) (bufp - buf);
2597 static ssize_t rbd_image_refresh(struct device *dev,
2598 struct device_attribute *attr,
2602 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2605 ret = rbd_dev_refresh(rbd_dev, NULL);
2607 return ret < 0 ? ret : size;
2610 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2611 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2612 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2613 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2614 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2615 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2616 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2617 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2618 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2619 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2620 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2622 static struct attribute *rbd_attrs[] = {
2623 &dev_attr_size.attr,
2624 &dev_attr_features.attr,
2625 &dev_attr_major.attr,
2626 &dev_attr_client_id.attr,
2627 &dev_attr_pool.attr,
2628 &dev_attr_pool_id.attr,
2629 &dev_attr_name.attr,
2630 &dev_attr_image_id.attr,
2631 &dev_attr_current_snap.attr,
2632 &dev_attr_parent.attr,
2633 &dev_attr_refresh.attr,
2637 static struct attribute_group rbd_attr_group = {
2641 static const struct attribute_group *rbd_attr_groups[] = {
2646 static void rbd_sysfs_dev_release(struct device *dev)
2650 static struct device_type rbd_device_type = {
2652 .groups = rbd_attr_groups,
2653 .release = rbd_sysfs_dev_release,
2661 static ssize_t rbd_snap_size_show(struct device *dev,
2662 struct device_attribute *attr,
2665 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2667 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2670 static ssize_t rbd_snap_id_show(struct device *dev,
2671 struct device_attribute *attr,
2674 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2676 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2679 static ssize_t rbd_snap_features_show(struct device *dev,
2680 struct device_attribute *attr,
2683 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2685 return sprintf(buf, "0x%016llx\n",
2686 (unsigned long long) snap->features);
2689 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2690 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2691 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2693 static struct attribute *rbd_snap_attrs[] = {
2694 &dev_attr_snap_size.attr,
2695 &dev_attr_snap_id.attr,
2696 &dev_attr_snap_features.attr,
2700 static struct attribute_group rbd_snap_attr_group = {
2701 .attrs = rbd_snap_attrs,
2704 static void rbd_snap_dev_release(struct device *dev)
2706 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2711 static const struct attribute_group *rbd_snap_attr_groups[] = {
2712 &rbd_snap_attr_group,
2716 static struct device_type rbd_snap_device_type = {
2717 .groups = rbd_snap_attr_groups,
2718 .release = rbd_snap_dev_release,
2721 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2723 kref_get(&spec->kref);
2728 static void rbd_spec_free(struct kref *kref);
2729 static void rbd_spec_put(struct rbd_spec *spec)
2732 kref_put(&spec->kref, rbd_spec_free);
2735 static struct rbd_spec *rbd_spec_alloc(void)
2737 struct rbd_spec *spec;
2739 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2742 kref_init(&spec->kref);
2744 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2749 static void rbd_spec_free(struct kref *kref)
2751 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2753 kfree(spec->pool_name);
2754 kfree(spec->image_id);
2755 kfree(spec->image_name);
2756 kfree(spec->snap_name);
2760 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2761 struct rbd_spec *spec)
2763 struct rbd_device *rbd_dev;
2765 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2769 spin_lock_init(&rbd_dev->lock);
2771 INIT_LIST_HEAD(&rbd_dev->node);
2772 INIT_LIST_HEAD(&rbd_dev->snaps);
2773 init_rwsem(&rbd_dev->header_rwsem);
2775 rbd_dev->spec = spec;
2776 rbd_dev->rbd_client = rbdc;
2778 /* Initialize the layout used for all rbd requests */
2780 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2781 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2782 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2783 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2788 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2790 rbd_spec_put(rbd_dev->parent_spec);
2791 kfree(rbd_dev->header_name);
2792 rbd_put_client(rbd_dev->rbd_client);
2793 rbd_spec_put(rbd_dev->spec);
2797 static bool rbd_snap_registered(struct rbd_snap *snap)
2799 bool ret = snap->dev.type == &rbd_snap_device_type;
2800 bool reg = device_is_registered(&snap->dev);
2802 rbd_assert(!ret ^ reg);
2807 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2809 list_del(&snap->node);
2810 if (device_is_registered(&snap->dev))
2811 device_unregister(&snap->dev);
2814 static int rbd_register_snap_dev(struct rbd_snap *snap,
2815 struct device *parent)
2817 struct device *dev = &snap->dev;
2820 dev->type = &rbd_snap_device_type;
2821 dev->parent = parent;
2822 dev->release = rbd_snap_dev_release;
2823 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2824 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2826 ret = device_register(dev);
2831 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2832 const char *snap_name,
2833 u64 snap_id, u64 snap_size,
2836 struct rbd_snap *snap;
2839 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2841 return ERR_PTR(-ENOMEM);
2844 snap->name = kstrdup(snap_name, GFP_KERNEL);
2849 snap->size = snap_size;
2850 snap->features = snap_features;
2858 return ERR_PTR(ret);
2861 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2862 u64 *snap_size, u64 *snap_features)
2866 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2868 *snap_size = rbd_dev->header.snap_sizes[which];
2869 *snap_features = 0; /* No features for v1 */
2871 /* Skip over names until we find the one we are looking for */
2873 snap_name = rbd_dev->header.snap_names;
2875 snap_name += strlen(snap_name) + 1;
2881 * Get the size and object order for an image snapshot, or if
2882 * snap_id is CEPH_NOSNAP, gets this information for the base
2885 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2886 u8 *order, u64 *snap_size)
2888 __le64 snapid = cpu_to_le64(snap_id);
2893 } __attribute__ ((packed)) size_buf = { 0 };
2895 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2897 (char *) &snapid, sizeof (snapid),
2898 (char *) &size_buf, sizeof (size_buf), NULL);
2899 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2903 *order = size_buf.order;
2904 *snap_size = le64_to_cpu(size_buf.size);
2906 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2907 (unsigned long long) snap_id, (unsigned int) *order,
2908 (unsigned long long) *snap_size);
2913 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2915 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2916 &rbd_dev->header.obj_order,
2917 &rbd_dev->header.image_size);
2920 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2926 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2930 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2931 "rbd", "get_object_prefix",
2933 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2934 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2939 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2940 p + RBD_OBJ_PREFIX_LEN_MAX,
2943 if (IS_ERR(rbd_dev->header.object_prefix)) {
2944 ret = PTR_ERR(rbd_dev->header.object_prefix);
2945 rbd_dev->header.object_prefix = NULL;
2947 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2956 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2959 __le64 snapid = cpu_to_le64(snap_id);
2963 } features_buf = { 0 };
2967 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2968 "rbd", "get_features",
2969 (char *) &snapid, sizeof (snapid),
2970 (char *) &features_buf, sizeof (features_buf),
2972 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2976 incompat = le64_to_cpu(features_buf.incompat);
2977 if (incompat & ~RBD_FEATURES_SUPPORTED)
2980 *snap_features = le64_to_cpu(features_buf.features);
2982 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2983 (unsigned long long) snap_id,
2984 (unsigned long long) *snap_features,
2985 (unsigned long long) le64_to_cpu(features_buf.incompat));
2990 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2992 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2993 &rbd_dev->header.features);
2996 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2998 struct rbd_spec *parent_spec;
3000 void *reply_buf = NULL;
3008 parent_spec = rbd_spec_alloc();
3012 size = sizeof (__le64) + /* pool_id */
3013 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3014 sizeof (__le64) + /* snap_id */
3015 sizeof (__le64); /* overlap */
3016 reply_buf = kmalloc(size, GFP_KERNEL);
3022 snapid = cpu_to_le64(CEPH_NOSNAP);
3023 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3024 "rbd", "get_parent",
3025 (char *) &snapid, sizeof (snapid),
3026 (char *) reply_buf, size, NULL);
3027 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3033 end = (char *) reply_buf + size;
3034 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3035 if (parent_spec->pool_id == CEPH_NOPOOL)
3036 goto out; /* No parent? No problem. */
3038 /* The ceph file layout needs to fit pool id in 32 bits */
3041 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
3044 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3045 if (IS_ERR(image_id)) {
3046 ret = PTR_ERR(image_id);
3049 parent_spec->image_id = image_id;
3050 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3051 ceph_decode_64_safe(&p, end, overlap, out_err);
3053 rbd_dev->parent_overlap = overlap;
3054 rbd_dev->parent_spec = parent_spec;
3055 parent_spec = NULL; /* rbd_dev now owns this */
3060 rbd_spec_put(parent_spec);
3065 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3067 size_t image_id_size;
3072 void *reply_buf = NULL;
3074 char *image_name = NULL;
3077 rbd_assert(!rbd_dev->spec->image_name);
3079 len = strlen(rbd_dev->spec->image_id);
3080 image_id_size = sizeof (__le32) + len;
3081 image_id = kmalloc(image_id_size, GFP_KERNEL);
3086 end = (char *) image_id + image_id_size;
3087 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
3089 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3090 reply_buf = kmalloc(size, GFP_KERNEL);
3094 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3095 "rbd", "dir_get_name",
3096 image_id, image_id_size,
3097 (char *) reply_buf, size, NULL);
3101 end = (char *) reply_buf + size;
3102 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3103 if (IS_ERR(image_name))
3106 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3115 * When a parent image gets probed, we only have the pool, image,
3116 * and snapshot ids but not the names of any of them. This call
3117 * is made later to fill in those names. It has to be done after
3118 * rbd_dev_snaps_update() has completed because some of the
3119 * information (in particular, snapshot name) is not available
3122 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3124 struct ceph_osd_client *osdc;
3126 void *reply_buf = NULL;
3129 if (rbd_dev->spec->pool_name)
3130 return 0; /* Already have the names */
3132 /* Look up the pool name */
3134 osdc = &rbd_dev->rbd_client->client->osdc;
3135 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3137 rbd_warn(rbd_dev, "there is no pool with id %llu",
3138 rbd_dev->spec->pool_id); /* Really a BUG() */
3142 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3143 if (!rbd_dev->spec->pool_name)
3146 /* Fetch the image name; tolerate failure here */
3148 name = rbd_dev_image_name(rbd_dev);
3150 rbd_dev->spec->image_name = (char *) name;
3152 rbd_warn(rbd_dev, "unable to get image name");
3154 /* Look up the snapshot name. */
3156 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3158 rbd_warn(rbd_dev, "no snapshot with id %llu",
3159 rbd_dev->spec->snap_id); /* Really a BUG() */
3163 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3164 if(!rbd_dev->spec->snap_name)
3170 kfree(rbd_dev->spec->pool_name);
3171 rbd_dev->spec->pool_name = NULL;
3176 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3185 struct ceph_snap_context *snapc;
3189 * We'll need room for the seq value (maximum snapshot id),
3190 * snapshot count, and array of that many snapshot ids.
3191 * For now we have a fixed upper limit on the number we're
3192 * prepared to receive.
3194 size = sizeof (__le64) + sizeof (__le32) +
3195 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3196 reply_buf = kzalloc(size, GFP_KERNEL);
3200 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3201 "rbd", "get_snapcontext",
3203 reply_buf, size, ver);
3204 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3210 end = (char *) reply_buf + size;
3211 ceph_decode_64_safe(&p, end, seq, out);
3212 ceph_decode_32_safe(&p, end, snap_count, out);
3215 * Make sure the reported number of snapshot ids wouldn't go
3216 * beyond the end of our buffer. But before checking that,
3217 * make sure the computed size of the snapshot context we
3218 * allocate is representable in a size_t.
3220 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3225 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3228 size = sizeof (struct ceph_snap_context) +
3229 snap_count * sizeof (snapc->snaps[0]);
3230 snapc = kmalloc(size, GFP_KERNEL);
3236 atomic_set(&snapc->nref, 1);
3238 snapc->num_snaps = snap_count;
3239 for (i = 0; i < snap_count; i++)
3240 snapc->snaps[i] = ceph_decode_64(&p);
3242 rbd_dev->header.snapc = snapc;
3244 dout(" snap context seq = %llu, snap_count = %u\n",
3245 (unsigned long long) seq, (unsigned int) snap_count);
3253 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3263 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3264 reply_buf = kmalloc(size, GFP_KERNEL);
3266 return ERR_PTR(-ENOMEM);
3268 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3269 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3270 "rbd", "get_snapshot_name",
3271 (char *) &snap_id, sizeof (snap_id),
3272 reply_buf, size, NULL);
3273 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3278 end = (char *) reply_buf + size;
3279 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3280 if (IS_ERR(snap_name)) {
3281 ret = PTR_ERR(snap_name);
3284 dout(" snap_id 0x%016llx snap_name = %s\n",
3285 (unsigned long long) le64_to_cpu(snap_id), snap_name);
3293 return ERR_PTR(ret);
3296 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3297 u64 *snap_size, u64 *snap_features)
3303 snap_id = rbd_dev->header.snapc->snaps[which];
3304 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3306 return ERR_PTR(ret);
3307 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3309 return ERR_PTR(ret);
3311 return rbd_dev_v2_snap_name(rbd_dev, which);
3314 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3315 u64 *snap_size, u64 *snap_features)
3317 if (rbd_dev->image_format == 1)
3318 return rbd_dev_v1_snap_info(rbd_dev, which,
3319 snap_size, snap_features);
3320 if (rbd_dev->image_format == 2)
3321 return rbd_dev_v2_snap_info(rbd_dev, which,
3322 snap_size, snap_features);
3323 return ERR_PTR(-EINVAL);
3326 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3331 down_write(&rbd_dev->header_rwsem);
3333 /* Grab old order first, to see if it changes */
3335 obj_order = rbd_dev->header.obj_order,
3336 ret = rbd_dev_v2_image_size(rbd_dev);
3339 if (rbd_dev->header.obj_order != obj_order) {
3343 rbd_update_mapping_size(rbd_dev);
3345 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3346 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3349 ret = rbd_dev_snaps_update(rbd_dev);
3350 dout("rbd_dev_snaps_update returned %d\n", ret);
3353 ret = rbd_dev_snaps_register(rbd_dev);
3354 dout("rbd_dev_snaps_register returned %d\n", ret);
3356 up_write(&rbd_dev->header_rwsem);
3362 * Scan the rbd device's current snapshot list and compare it to the
3363 * newly-received snapshot context. Remove any existing snapshots
3364 * not present in the new snapshot context. Add a new snapshot for
3365 * any snaphots in the snapshot context not in the current list.
3366 * And verify there are no changes to snapshots we already know
3369 * Assumes the snapshots in the snapshot context are sorted by
3370 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3371 * are also maintained in that order.)
3373 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3375 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3376 const u32 snap_count = snapc->num_snaps;
3377 struct list_head *head = &rbd_dev->snaps;
3378 struct list_head *links = head->next;
3381 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3382 while (index < snap_count || links != head) {
3384 struct rbd_snap *snap;
3387 u64 snap_features = 0;
3389 snap_id = index < snap_count ? snapc->snaps[index]
3391 snap = links != head ? list_entry(links, struct rbd_snap, node)
3393 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3395 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3396 struct list_head *next = links->next;
3399 * A previously-existing snapshot is not in
3400 * the new snap context.
3402 * If the now missing snapshot is the one the
3403 * image is mapped to, clear its exists flag
3404 * so we can avoid sending any more requests
3407 if (rbd_dev->spec->snap_id == snap->id)
3408 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3409 rbd_remove_snap_dev(snap);
3410 dout("%ssnap id %llu has been removed\n",
3411 rbd_dev->spec->snap_id == snap->id ?
3413 (unsigned long long) snap->id);
3415 /* Done with this list entry; advance */
3421 snap_name = rbd_dev_snap_info(rbd_dev, index,
3422 &snap_size, &snap_features);
3423 if (IS_ERR(snap_name))
3424 return PTR_ERR(snap_name);
3426 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3427 (unsigned long long) snap_id);
3428 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3429 struct rbd_snap *new_snap;
3431 /* We haven't seen this snapshot before */
3433 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3434 snap_id, snap_size, snap_features);
3435 if (IS_ERR(new_snap)) {
3436 int err = PTR_ERR(new_snap);
3438 dout(" failed to add dev, error %d\n", err);
3443 /* New goes before existing, or at end of list */
3445 dout(" added dev%s\n", snap ? "" : " at end\n");
3447 list_add_tail(&new_snap->node, &snap->node);
3449 list_add_tail(&new_snap->node, head);
3451 /* Already have this one */
3453 dout(" already present\n");
3455 rbd_assert(snap->size == snap_size);
3456 rbd_assert(!strcmp(snap->name, snap_name));
3457 rbd_assert(snap->features == snap_features);
3459 /* Done with this list entry; advance */
3461 links = links->next;
3464 /* Advance to the next entry in the snapshot context */
3468 dout("%s: done\n", __func__);
3474 * Scan the list of snapshots and register the devices for any that
3475 * have not already been registered.
3477 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3479 struct rbd_snap *snap;
3482 dout("%s:\n", __func__);
3483 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3486 list_for_each_entry(snap, &rbd_dev->snaps, node) {
3487 if (!rbd_snap_registered(snap)) {
3488 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3493 dout("%s: returning %d\n", __func__, ret);
3498 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3503 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3505 dev = &rbd_dev->dev;
3506 dev->bus = &rbd_bus_type;
3507 dev->type = &rbd_device_type;
3508 dev->parent = &rbd_root_dev;
3509 dev->release = rbd_dev_release;
3510 dev_set_name(dev, "%d", rbd_dev->dev_id);
3511 ret = device_register(dev);
3513 mutex_unlock(&ctl_mutex);
3518 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3520 device_unregister(&rbd_dev->dev);
3523 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3526 * Get a unique rbd identifier for the given new rbd_dev, and add
3527 * the rbd_dev to the global list. The minimum rbd id is 1.
3529 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3531 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3533 spin_lock(&rbd_dev_list_lock);
3534 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3535 spin_unlock(&rbd_dev_list_lock);
3536 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3537 (unsigned long long) rbd_dev->dev_id);
3541 * Remove an rbd_dev from the global list, and record that its
3542 * identifier is no longer in use.
3544 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3546 struct list_head *tmp;
3547 int rbd_id = rbd_dev->dev_id;
3550 rbd_assert(rbd_id > 0);
3552 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3553 (unsigned long long) rbd_dev->dev_id);
3554 spin_lock(&rbd_dev_list_lock);
3555 list_del_init(&rbd_dev->node);
3558 * If the id being "put" is not the current maximum, there
3559 * is nothing special we need to do.
3561 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3562 spin_unlock(&rbd_dev_list_lock);
3567 * We need to update the current maximum id. Search the
3568 * list to find out what it is. We're more likely to find
3569 * the maximum at the end, so search the list backward.
3572 list_for_each_prev(tmp, &rbd_dev_list) {
3573 struct rbd_device *rbd_dev;
3575 rbd_dev = list_entry(tmp, struct rbd_device, node);
3576 if (rbd_dev->dev_id > max_id)
3577 max_id = rbd_dev->dev_id;
3579 spin_unlock(&rbd_dev_list_lock);
3582 * The max id could have been updated by rbd_dev_id_get(), in
3583 * which case it now accurately reflects the new maximum.
3584 * Be careful not to overwrite the maximum value in that
3587 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3588 dout(" max dev id has been reset\n");
3592 * Skips over white space at *buf, and updates *buf to point to the
3593 * first found non-space character (if any). Returns the length of
3594 * the token (string of non-white space characters) found. Note
3595 * that *buf must be terminated with '\0'.
3597 static inline size_t next_token(const char **buf)
3600 * These are the characters that produce nonzero for
3601 * isspace() in the "C" and "POSIX" locales.
3603 const char *spaces = " \f\n\r\t\v";
3605 *buf += strspn(*buf, spaces); /* Find start of token */
3607 return strcspn(*buf, spaces); /* Return token length */
3611 * Finds the next token in *buf, and if the provided token buffer is
3612 * big enough, copies the found token into it. The result, if
3613 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3614 * must be terminated with '\0' on entry.
3616 * Returns the length of the token found (not including the '\0').
3617 * Return value will be 0 if no token is found, and it will be >=
3618 * token_size if the token would not fit.
3620 * The *buf pointer will be updated to point beyond the end of the
3621 * found token. Note that this occurs even if the token buffer is
3622 * too small to hold it.
3624 static inline size_t copy_token(const char **buf,
3630 len = next_token(buf);
3631 if (len < token_size) {
3632 memcpy(token, *buf, len);
3633 *(token + len) = '\0';
3641 * Finds the next token in *buf, dynamically allocates a buffer big
3642 * enough to hold a copy of it, and copies the token into the new
3643 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3644 * that a duplicate buffer is created even for a zero-length token.
3646 * Returns a pointer to the newly-allocated duplicate, or a null
3647 * pointer if memory for the duplicate was not available. If
3648 * the lenp argument is a non-null pointer, the length of the token
3649 * (not including the '\0') is returned in *lenp.
3651 * If successful, the *buf pointer will be updated to point beyond
3652 * the end of the found token.
3654 * Note: uses GFP_KERNEL for allocation.
3656 static inline char *dup_token(const char **buf, size_t *lenp)
3661 len = next_token(buf);
3662 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3665 *(dup + len) = '\0';
3675 * Parse the options provided for an "rbd add" (i.e., rbd image
3676 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3677 * and the data written is passed here via a NUL-terminated buffer.
3678 * Returns 0 if successful or an error code otherwise.
3680 * The information extracted from these options is recorded in
3681 * the other parameters which return dynamically-allocated
3684 * The address of a pointer that will refer to a ceph options
3685 * structure. Caller must release the returned pointer using
3686 * ceph_destroy_options() when it is no longer needed.
3688 * Address of an rbd options pointer. Fully initialized by
3689 * this function; caller must release with kfree().
3691 * Address of an rbd image specification pointer. Fully
3692 * initialized by this function based on parsed options.
3693 * Caller must release with rbd_spec_put().
3695 * The options passed take this form:
3696 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3699 * A comma-separated list of one or more monitor addresses.
3700 * A monitor address is an ip address, optionally followed
3701 * by a port number (separated by a colon).
3702 * I.e.: ip1[:port1][,ip2[:port2]...]
3704 * A comma-separated list of ceph and/or rbd options.
3706 * The name of the rados pool containing the rbd image.
3708 * The name of the image in that pool to map.
3710 * An optional snapshot id. If provided, the mapping will
3711 * present data from the image at the time that snapshot was
3712 * created. The image head is used if no snapshot id is
3713 * provided. Snapshot mappings are always read-only.
3715 static int rbd_add_parse_args(const char *buf,
3716 struct ceph_options **ceph_opts,
3717 struct rbd_options **opts,
3718 struct rbd_spec **rbd_spec)
3722 const char *mon_addrs;
3723 size_t mon_addrs_size;
3724 struct rbd_spec *spec = NULL;
3725 struct rbd_options *rbd_opts = NULL;
3726 struct ceph_options *copts;
3729 /* The first four tokens are required */
3731 len = next_token(&buf);
3733 rbd_warn(NULL, "no monitor address(es) provided");
3737 mon_addrs_size = len + 1;
3741 options = dup_token(&buf, NULL);
3745 rbd_warn(NULL, "no options provided");
3749 spec = rbd_spec_alloc();
3753 spec->pool_name = dup_token(&buf, NULL);
3754 if (!spec->pool_name)
3756 if (!*spec->pool_name) {
3757 rbd_warn(NULL, "no pool name provided");
3761 spec->image_name = dup_token(&buf, NULL);
3762 if (!spec->image_name)
3764 if (!*spec->image_name) {
3765 rbd_warn(NULL, "no image name provided");
3770 * Snapshot name is optional; default is to use "-"
3771 * (indicating the head/no snapshot).
3773 len = next_token(&buf);
3775 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3776 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3777 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3778 ret = -ENAMETOOLONG;
3781 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3782 if (!spec->snap_name)
3784 *(spec->snap_name + len) = '\0';
3786 /* Initialize all rbd options to the defaults */
3788 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3792 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3794 copts = ceph_parse_options(options, mon_addrs,
3795 mon_addrs + mon_addrs_size - 1,
3796 parse_rbd_opts_token, rbd_opts);
3797 if (IS_ERR(copts)) {
3798 ret = PTR_ERR(copts);
3819 * An rbd format 2 image has a unique identifier, distinct from the
3820 * name given to it by the user. Internally, that identifier is
3821 * what's used to specify the names of objects related to the image.
3823 * A special "rbd id" object is used to map an rbd image name to its
3824 * id. If that object doesn't exist, then there is no v2 rbd image
3825 * with the supplied name.
3827 * This function will record the given rbd_dev's image_id field if
3828 * it can be determined, and in that case will return 0. If any
3829 * errors occur a negative errno will be returned and the rbd_dev's
3830 * image_id field will be unchanged (and should be NULL).
3832 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3841 * When probing a parent image, the image id is already
3842 * known (and the image name likely is not). There's no
3843 * need to fetch the image id again in this case.
3845 if (rbd_dev->spec->image_id)
3849 * First, see if the format 2 image id file exists, and if
3850 * so, get the image's persistent id from it.
3852 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3853 object_name = kmalloc(size, GFP_NOIO);
3856 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3857 dout("rbd id object name is %s\n", object_name);
3859 /* Response will be an encoded string, which includes a length */
3861 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3862 response = kzalloc(size, GFP_NOIO);
3868 ret = rbd_obj_method_sync(rbd_dev, object_name,
3871 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3872 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3877 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3878 p + RBD_IMAGE_ID_LEN_MAX,
3880 if (IS_ERR(rbd_dev->spec->image_id)) {
3881 ret = PTR_ERR(rbd_dev->spec->image_id);
3882 rbd_dev->spec->image_id = NULL;
3884 dout("image_id is %s\n", rbd_dev->spec->image_id);
3893 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3898 /* Version 1 images have no id; empty string is used */
3900 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3901 if (!rbd_dev->spec->image_id)
3904 /* Record the header object name for this rbd image. */
3906 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3907 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3908 if (!rbd_dev->header_name) {
3912 sprintf(rbd_dev->header_name, "%s%s",
3913 rbd_dev->spec->image_name, RBD_SUFFIX);
3915 /* Populate rbd image metadata */
3917 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3921 /* Version 1 images have no parent (no layering) */
3923 rbd_dev->parent_spec = NULL;
3924 rbd_dev->parent_overlap = 0;
3926 rbd_dev->image_format = 1;
3928 dout("discovered version 1 image, header name is %s\n",
3929 rbd_dev->header_name);
3934 kfree(rbd_dev->header_name);
3935 rbd_dev->header_name = NULL;
3936 kfree(rbd_dev->spec->image_id);
3937 rbd_dev->spec->image_id = NULL;
3942 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3949 * Image id was filled in by the caller. Record the header
3950 * object name for this rbd image.
3952 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3953 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3954 if (!rbd_dev->header_name)
3956 sprintf(rbd_dev->header_name, "%s%s",
3957 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3959 /* Get the size and object order for the image */
3961 ret = rbd_dev_v2_image_size(rbd_dev);
3965 /* Get the object prefix (a.k.a. block_name) for the image */
3967 ret = rbd_dev_v2_object_prefix(rbd_dev);
3971 /* Get the and check features for the image */
3973 ret = rbd_dev_v2_features(rbd_dev);
3977 /* If the image supports layering, get the parent info */
3979 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3980 ret = rbd_dev_v2_parent_info(rbd_dev);
3985 /* crypto and compression type aren't (yet) supported for v2 images */
3987 rbd_dev->header.crypt_type = 0;
3988 rbd_dev->header.comp_type = 0;
3990 /* Get the snapshot context, plus the header version */
3992 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3995 rbd_dev->header.obj_version = ver;
3997 rbd_dev->image_format = 2;
3999 dout("discovered version 2 image, header name is %s\n",
4000 rbd_dev->header_name);
4004 rbd_dev->parent_overlap = 0;
4005 rbd_spec_put(rbd_dev->parent_spec);
4006 rbd_dev->parent_spec = NULL;
4007 kfree(rbd_dev->header_name);
4008 rbd_dev->header_name = NULL;
4009 kfree(rbd_dev->header.object_prefix);
4010 rbd_dev->header.object_prefix = NULL;
4015 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4019 /* no need to lock here, as rbd_dev is not registered yet */
4020 ret = rbd_dev_snaps_update(rbd_dev);
4024 ret = rbd_dev_probe_update_spec(rbd_dev);
4028 ret = rbd_dev_set_mapping(rbd_dev);
4032 /* generate unique id: find highest unique id, add one */
4033 rbd_dev_id_get(rbd_dev);
4035 /* Fill in the device name, now that we have its id. */
4036 BUILD_BUG_ON(DEV_NAME_LEN
4037 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4038 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4040 /* Get our block major device number. */
4042 ret = register_blkdev(0, rbd_dev->name);
4045 rbd_dev->major = ret;
4047 /* Set up the blkdev mapping. */
4049 ret = rbd_init_disk(rbd_dev);
4051 goto err_out_blkdev;
4053 ret = rbd_bus_add_dev(rbd_dev);
4058 * At this point cleanup in the event of an error is the job
4059 * of the sysfs code (initiated by rbd_bus_del_dev()).
4061 down_write(&rbd_dev->header_rwsem);
4062 ret = rbd_dev_snaps_register(rbd_dev);
4063 up_write(&rbd_dev->header_rwsem);
4067 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4071 /* Everything's ready. Announce the disk to the world. */
4073 add_disk(rbd_dev->disk);
4075 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4076 (unsigned long long) rbd_dev->mapping.size);
4080 /* this will also clean up rest of rbd_dev stuff */
4082 rbd_bus_del_dev(rbd_dev);
4086 rbd_free_disk(rbd_dev);
4088 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4090 rbd_dev_id_put(rbd_dev);
4092 rbd_remove_all_snaps(rbd_dev);
4098 * Probe for the existence of the header object for the given rbd
4099 * device. For format 2 images this includes determining the image
4102 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4107 * Get the id from the image id object. If it's not a
4108 * format 2 image, we'll get ENOENT back, and we'll assume
4109 * it's a format 1 image.
4111 ret = rbd_dev_image_id(rbd_dev);
4113 ret = rbd_dev_v1_probe(rbd_dev);
4115 ret = rbd_dev_v2_probe(rbd_dev);
4117 dout("probe failed, returning %d\n", ret);
4122 ret = rbd_dev_probe_finish(rbd_dev);
4124 rbd_header_free(&rbd_dev->header);
4129 static ssize_t rbd_add(struct bus_type *bus,
4133 struct rbd_device *rbd_dev = NULL;
4134 struct ceph_options *ceph_opts = NULL;
4135 struct rbd_options *rbd_opts = NULL;
4136 struct rbd_spec *spec = NULL;
4137 struct rbd_client *rbdc;
4138 struct ceph_osd_client *osdc;
4141 if (!try_module_get(THIS_MODULE))
4144 /* parse add command */
4145 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4147 goto err_out_module;
4149 rbdc = rbd_get_client(ceph_opts);
4154 ceph_opts = NULL; /* rbd_dev client now owns this */
4157 osdc = &rbdc->client->osdc;
4158 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4160 goto err_out_client;
4161 spec->pool_id = (u64) rc;
4163 /* The ceph file layout needs to fit pool id in 32 bits */
4165 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4167 goto err_out_client;
4170 rbd_dev = rbd_dev_create(rbdc, spec);
4172 goto err_out_client;
4173 rbdc = NULL; /* rbd_dev now owns this */
4174 spec = NULL; /* rbd_dev now owns this */
4176 rbd_dev->mapping.read_only = rbd_opts->read_only;
4178 rbd_opts = NULL; /* done with this */
4180 rc = rbd_dev_probe(rbd_dev);
4182 goto err_out_rbd_dev;
4186 rbd_dev_destroy(rbd_dev);
4188 rbd_put_client(rbdc);
4191 ceph_destroy_options(ceph_opts);
4195 module_put(THIS_MODULE);
4197 dout("Error adding device %s\n", buf);
4199 return (ssize_t) rc;
4202 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4204 struct list_head *tmp;
4205 struct rbd_device *rbd_dev;
4207 spin_lock(&rbd_dev_list_lock);
4208 list_for_each(tmp, &rbd_dev_list) {
4209 rbd_dev = list_entry(tmp, struct rbd_device, node);
4210 if (rbd_dev->dev_id == dev_id) {
4211 spin_unlock(&rbd_dev_list_lock);
4215 spin_unlock(&rbd_dev_list_lock);
4219 static void rbd_dev_release(struct device *dev)
4221 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4223 if (rbd_dev->watch_event)
4224 rbd_dev_header_watch_sync(rbd_dev, 0);
4226 /* clean up and free blkdev */
4227 rbd_free_disk(rbd_dev);
4228 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4230 /* release allocated disk header fields */
4231 rbd_header_free(&rbd_dev->header);
4233 /* done with the id, and with the rbd_dev */
4234 rbd_dev_id_put(rbd_dev);
4235 rbd_assert(rbd_dev->rbd_client != NULL);
4236 rbd_dev_destroy(rbd_dev);
4238 /* release module ref */
4239 module_put(THIS_MODULE);
4242 static ssize_t rbd_remove(struct bus_type *bus,
4246 struct rbd_device *rbd_dev = NULL;
4251 rc = strict_strtoul(buf, 10, &ul);
4255 /* convert to int; abort if we lost anything in the conversion */
4256 target_id = (int) ul;
4257 if (target_id != ul)
4260 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4262 rbd_dev = __rbd_get_dev(target_id);
4268 spin_lock_irq(&rbd_dev->lock);
4269 if (rbd_dev->open_count)
4272 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4273 spin_unlock_irq(&rbd_dev->lock);
4277 rbd_remove_all_snaps(rbd_dev);
4278 rbd_bus_del_dev(rbd_dev);
4281 mutex_unlock(&ctl_mutex);
4287 * create control files in sysfs
4290 static int rbd_sysfs_init(void)
4294 ret = device_register(&rbd_root_dev);
4298 ret = bus_register(&rbd_bus_type);
4300 device_unregister(&rbd_root_dev);
4305 static void rbd_sysfs_cleanup(void)
4307 bus_unregister(&rbd_bus_type);
4308 device_unregister(&rbd_root_dev);
4311 static int __init rbd_init(void)
4315 if (!libceph_compatible(NULL)) {
4316 rbd_warn(NULL, "libceph incompatibility (quitting)");
4320 rc = rbd_sysfs_init();
4323 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4327 static void __exit rbd_exit(void)
4329 rbd_sysfs_cleanup();
4332 module_init(rbd_init);
4333 module_exit(rbd_exit);
4335 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4336 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4337 MODULE_DESCRIPTION("rados block device");
4339 /* following authorship retained from original osdblk.c */
4340 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4342 MODULE_LICENSE("GPL");