2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
58 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
60 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN \
62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
64 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
66 #define RBD_SNAP_HEAD_NAME "-"
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX 64
72 #define RBD_OBJ_PREFIX_LEN_MAX 64
76 #define RBD_FEATURE_LAYERING (1<<0)
77 #define RBD_FEATURE_STRIPINGV2 (1<<1)
78 #define RBD_FEATURES_ALL \
79 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
81 /* Features supported by this (client software) implementation. */
83 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
86 * An RBD device name will be "rbd#", where the "rbd" comes from
87 * RBD_DRV_NAME above, and # is a unique integer identifier.
88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89 * enough to hold all possible device names.
91 #define DEV_NAME_LEN 32
92 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
95 * block device image metadata (in-memory version)
97 struct rbd_image_header {
98 /* These four fields never change for a given rbd image */
105 /* The remaining fields need to be updated occasionally */
107 struct ceph_snap_context *snapc;
116 * An rbd image specification.
118 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
119 * identify an image. Each rbd_dev structure includes a pointer to
120 * an rbd_spec structure that encapsulates this identity.
122 * Each of the id's in an rbd_spec has an associated name. For a
123 * user-mapped image, the names are supplied and the id's associated
124 * with them are looked up. For a layered image, a parent image is
125 * defined by the tuple, and the names are looked up.
127 * An rbd_dev structure contains a parent_spec pointer which is
128 * non-null if the image it represents is a child in a layered
129 * image. This pointer will refer to the rbd_spec structure used
130 * by the parent rbd_dev for its own identity (i.e., the structure
131 * is shared between the parent and child).
133 * Since these structures are populated once, during the discovery
134 * phase of image construction, they are effectively immutable so
135 * we make no effort to synchronize access to them.
137 * Note that code herein does not assume the image name is known (it
138 * could be a null pointer).
142 const char *pool_name;
144 const char *image_id;
145 const char *image_name;
148 const char *snap_name;
154 * an instance of the client. multiple devices may share an rbd client.
157 struct ceph_client *client;
159 struct list_head node;
162 struct rbd_img_request;
163 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
165 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
167 struct rbd_obj_request;
168 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
170 enum obj_request_type {
171 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
176 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
177 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
178 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
181 struct rbd_obj_request {
182 const char *object_name;
183 u64 offset; /* object start byte */
184 u64 length; /* bytes from offset */
188 * An object request associated with an image will have its
189 * img_data flag set; a standalone object request will not.
191 * A standalone object request will have which == BAD_WHICH
192 * and a null obj_request pointer.
194 * An object request initiated in support of a layered image
195 * object (to check for its existence before a write) will
196 * have which == BAD_WHICH and a non-null obj_request pointer.
198 * Finally, an object request for rbd image data will have
199 * which != BAD_WHICH, and will have a non-null img_request
200 * pointer. The value of which will be in the range
201 * 0..(img_request->obj_request_count-1).
204 struct rbd_obj_request *obj_request; /* STAT op */
206 struct rbd_img_request *img_request;
208 /* links for img_request->obj_requests list */
209 struct list_head links;
212 u32 which; /* posn image request list */
214 enum obj_request_type type;
216 struct bio *bio_list;
222 struct page **copyup_pages;
224 struct ceph_osd_request *osd_req;
226 u64 xferred; /* bytes transferred */
230 rbd_obj_callback_t callback;
231 struct completion completion;
237 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
238 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
239 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
242 struct rbd_img_request {
243 struct rbd_device *rbd_dev;
244 u64 offset; /* starting image byte offset */
245 u64 length; /* byte count from offset */
248 u64 snap_id; /* for reads */
249 struct ceph_snap_context *snapc; /* for writes */
252 struct request *rq; /* block request */
253 struct rbd_obj_request *obj_request; /* obj req initiator */
255 struct page **copyup_pages;
256 spinlock_t completion_lock;/* protects next_completion */
258 rbd_img_callback_t callback;
259 u64 xferred;/* aggregate bytes transferred */
260 int result; /* first nonzero obj_request result */
262 u32 obj_request_count;
263 struct list_head obj_requests; /* rbd_obj_request structs */
268 #define for_each_obj_request(ireq, oreq) \
269 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
270 #define for_each_obj_request_from(ireq, oreq) \
271 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_safe(ireq, oreq, n) \
273 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
278 struct list_head node;
293 int dev_id; /* blkdev unique id */
295 int major; /* blkdev assigned major */
296 struct gendisk *disk; /* blkdev's gendisk and rq */
298 u32 image_format; /* Either 1 or 2 */
299 struct rbd_client *rbd_client;
301 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
303 spinlock_t lock; /* queue, flags, open_count */
305 struct rbd_image_header header;
306 unsigned long flags; /* possibly lock protected */
307 struct rbd_spec *spec;
311 struct ceph_file_layout layout;
313 struct ceph_osd_event *watch_event;
314 struct rbd_obj_request *watch_request;
316 struct rbd_spec *parent_spec;
318 struct rbd_device *parent;
320 /* protects updating the header */
321 struct rw_semaphore header_rwsem;
323 struct rbd_mapping mapping;
325 struct list_head node;
327 /* list of snapshots */
328 struct list_head snaps;
332 unsigned long open_count; /* protected by lock */
336 * Flag bits for rbd_dev->flags. If atomicity is required,
337 * rbd_dev->lock is used to protect access.
339 * Currently, only the "removing" flag (which is coupled with the
340 * "open_count" field) requires atomic access.
343 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
344 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
347 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
349 static LIST_HEAD(rbd_dev_list); /* devices */
350 static DEFINE_SPINLOCK(rbd_dev_list_lock);
352 static LIST_HEAD(rbd_client_list); /* clients */
353 static DEFINE_SPINLOCK(rbd_client_list_lock);
355 static int rbd_img_request_submit(struct rbd_img_request *img_request);
357 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
359 static void rbd_dev_device_release(struct device *dev);
360 static void rbd_snap_destroy(struct rbd_snap *snap);
362 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
364 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
366 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
368 static struct bus_attribute rbd_bus_attrs[] = {
369 __ATTR(add, S_IWUSR, NULL, rbd_add),
370 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
374 static struct bus_type rbd_bus_type = {
376 .bus_attrs = rbd_bus_attrs,
379 static void rbd_root_dev_release(struct device *dev)
383 static struct device rbd_root_dev = {
385 .release = rbd_root_dev_release,
388 static __printf(2, 3)
389 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
391 struct va_format vaf;
399 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
400 else if (rbd_dev->disk)
401 printk(KERN_WARNING "%s: %s: %pV\n",
402 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
403 else if (rbd_dev->spec && rbd_dev->spec->image_name)
404 printk(KERN_WARNING "%s: image %s: %pV\n",
405 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
406 else if (rbd_dev->spec && rbd_dev->spec->image_id)
407 printk(KERN_WARNING "%s: id %s: %pV\n",
408 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
410 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
411 RBD_DRV_NAME, rbd_dev, &vaf);
416 #define rbd_assert(expr) \
417 if (unlikely(!(expr))) { \
418 printk(KERN_ERR "\nAssertion failure in %s() " \
420 "\trbd_assert(%s);\n\n", \
421 __func__, __LINE__, #expr); \
424 #else /* !RBD_DEBUG */
425 # define rbd_assert(expr) ((void) 0)
426 #endif /* !RBD_DEBUG */
428 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
429 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
430 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
432 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
433 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
435 static int rbd_open(struct block_device *bdev, fmode_t mode)
437 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
438 bool removing = false;
440 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
443 spin_lock_irq(&rbd_dev->lock);
444 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
447 rbd_dev->open_count++;
448 spin_unlock_irq(&rbd_dev->lock);
452 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
453 (void) get_device(&rbd_dev->dev);
454 set_device_ro(bdev, rbd_dev->mapping.read_only);
455 mutex_unlock(&ctl_mutex);
460 static int rbd_release(struct gendisk *disk, fmode_t mode)
462 struct rbd_device *rbd_dev = disk->private_data;
463 unsigned long open_count_before;
465 spin_lock_irq(&rbd_dev->lock);
466 open_count_before = rbd_dev->open_count--;
467 spin_unlock_irq(&rbd_dev->lock);
468 rbd_assert(open_count_before > 0);
470 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
471 put_device(&rbd_dev->dev);
472 mutex_unlock(&ctl_mutex);
477 static const struct block_device_operations rbd_bd_ops = {
478 .owner = THIS_MODULE,
480 .release = rbd_release,
484 * Initialize an rbd client instance.
487 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
489 struct rbd_client *rbdc;
492 dout("%s:\n", __func__);
493 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
497 kref_init(&rbdc->kref);
498 INIT_LIST_HEAD(&rbdc->node);
500 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
502 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
503 if (IS_ERR(rbdc->client))
505 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
507 ret = ceph_open_session(rbdc->client);
511 spin_lock(&rbd_client_list_lock);
512 list_add_tail(&rbdc->node, &rbd_client_list);
513 spin_unlock(&rbd_client_list_lock);
515 mutex_unlock(&ctl_mutex);
516 dout("%s: rbdc %p\n", __func__, rbdc);
521 ceph_destroy_client(rbdc->client);
523 mutex_unlock(&ctl_mutex);
527 ceph_destroy_options(ceph_opts);
528 dout("%s: error %d\n", __func__, ret);
533 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
535 kref_get(&rbdc->kref);
541 * Find a ceph client with specific addr and configuration. If
542 * found, bump its reference count.
544 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
546 struct rbd_client *client_node;
549 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
552 spin_lock(&rbd_client_list_lock);
553 list_for_each_entry(client_node, &rbd_client_list, node) {
554 if (!ceph_compare_options(ceph_opts, client_node->client)) {
555 __rbd_get_client(client_node);
561 spin_unlock(&rbd_client_list_lock);
563 return found ? client_node : NULL;
573 /* string args above */
576 /* Boolean args above */
580 static match_table_t rbd_opts_tokens = {
582 /* string args above */
583 {Opt_read_only, "read_only"},
584 {Opt_read_only, "ro"}, /* Alternate spelling */
585 {Opt_read_write, "read_write"},
586 {Opt_read_write, "rw"}, /* Alternate spelling */
587 /* Boolean args above */
595 #define RBD_READ_ONLY_DEFAULT false
597 static int parse_rbd_opts_token(char *c, void *private)
599 struct rbd_options *rbd_opts = private;
600 substring_t argstr[MAX_OPT_ARGS];
601 int token, intval, ret;
603 token = match_token(c, rbd_opts_tokens, argstr);
607 if (token < Opt_last_int) {
608 ret = match_int(&argstr[0], &intval);
610 pr_err("bad mount option arg (not int) "
614 dout("got int token %d val %d\n", token, intval);
615 } else if (token > Opt_last_int && token < Opt_last_string) {
616 dout("got string token %d val %s\n", token,
618 } else if (token > Opt_last_string && token < Opt_last_bool) {
619 dout("got Boolean token %d\n", token);
621 dout("got token %d\n", token);
626 rbd_opts->read_only = true;
629 rbd_opts->read_only = false;
639 * Get a ceph client with specific addr and configuration, if one does
640 * not exist create it.
642 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
644 struct rbd_client *rbdc;
646 rbdc = rbd_client_find(ceph_opts);
647 if (rbdc) /* using an existing client */
648 ceph_destroy_options(ceph_opts);
650 rbdc = rbd_client_create(ceph_opts);
656 * Destroy ceph client
658 * Caller must hold rbd_client_list_lock.
660 static void rbd_client_release(struct kref *kref)
662 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
664 dout("%s: rbdc %p\n", __func__, rbdc);
665 spin_lock(&rbd_client_list_lock);
666 list_del(&rbdc->node);
667 spin_unlock(&rbd_client_list_lock);
669 ceph_destroy_client(rbdc->client);
674 * Drop reference to ceph client node. If it's not referenced anymore, release
677 static void rbd_put_client(struct rbd_client *rbdc)
680 kref_put(&rbdc->kref, rbd_client_release);
683 static bool rbd_image_format_valid(u32 image_format)
685 return image_format == 1 || image_format == 2;
688 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
693 /* The header has to start with the magic rbd header text */
694 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
697 /* The bio layer requires at least sector-sized I/O */
699 if (ondisk->options.order < SECTOR_SHIFT)
702 /* If we use u64 in a few spots we may be able to loosen this */
704 if (ondisk->options.order > 8 * sizeof (int) - 1)
708 * The size of a snapshot header has to fit in a size_t, and
709 * that limits the number of snapshots.
711 snap_count = le32_to_cpu(ondisk->snap_count);
712 size = SIZE_MAX - sizeof (struct ceph_snap_context);
713 if (snap_count > size / sizeof (__le64))
717 * Not only that, but the size of the entire the snapshot
718 * header must also be representable in a size_t.
720 size -= snap_count * sizeof (__le64);
721 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
728 * Create a new header structure, translate header format from the on-disk
731 static int rbd_header_from_disk(struct rbd_image_header *header,
732 struct rbd_image_header_ondisk *ondisk)
739 memset(header, 0, sizeof (*header));
741 snap_count = le32_to_cpu(ondisk->snap_count);
743 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
744 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
745 if (!header->object_prefix)
747 memcpy(header->object_prefix, ondisk->object_prefix, len);
748 header->object_prefix[len] = '\0';
751 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
753 /* Save a copy of the snapshot names */
755 if (snap_names_len > (u64) SIZE_MAX)
757 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
758 if (!header->snap_names)
761 * Note that rbd_dev_v1_header_read() guarantees
762 * the ondisk buffer we're working with has
763 * snap_names_len bytes beyond the end of the
764 * snapshot id array, this memcpy() is safe.
766 memcpy(header->snap_names, &ondisk->snaps[snap_count],
769 /* Record each snapshot's size */
771 size = snap_count * sizeof (*header->snap_sizes);
772 header->snap_sizes = kmalloc(size, GFP_KERNEL);
773 if (!header->snap_sizes)
775 for (i = 0; i < snap_count; i++)
776 header->snap_sizes[i] =
777 le64_to_cpu(ondisk->snaps[i].image_size);
779 header->snap_names = NULL;
780 header->snap_sizes = NULL;
783 header->features = 0; /* No features support in v1 images */
784 header->obj_order = ondisk->options.order;
785 header->crypt_type = ondisk->options.crypt_type;
786 header->comp_type = ondisk->options.comp_type;
788 /* Allocate and fill in the snapshot context */
790 header->image_size = le64_to_cpu(ondisk->image_size);
792 header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
795 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
796 for (i = 0; i < snap_count; i++)
797 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
802 kfree(header->snap_sizes);
803 header->snap_sizes = NULL;
804 kfree(header->snap_names);
805 header->snap_names = NULL;
806 kfree(header->object_prefix);
807 header->object_prefix = NULL;
812 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
814 struct rbd_snap *snap;
816 if (snap_id == CEPH_NOSNAP)
817 return RBD_SNAP_HEAD_NAME;
819 list_for_each_entry(snap, &rbd_dev->snaps, node)
820 if (snap_id == snap->id)
826 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
827 const char *snap_name)
829 struct rbd_snap *snap;
831 list_for_each_entry(snap, &rbd_dev->snaps, node)
832 if (!strcmp(snap_name, snap->name))
838 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
840 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
841 sizeof (RBD_SNAP_HEAD_NAME))) {
842 rbd_dev->mapping.size = rbd_dev->header.image_size;
843 rbd_dev->mapping.features = rbd_dev->header.features;
845 struct rbd_snap *snap;
847 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
850 rbd_dev->mapping.size = snap->size;
851 rbd_dev->mapping.features = snap->features;
852 rbd_dev->mapping.read_only = true;
858 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
860 rbd_dev->mapping.size = 0;
861 rbd_dev->mapping.features = 0;
862 rbd_dev->mapping.read_only = true;
865 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
867 rbd_dev->mapping.size = 0;
868 rbd_dev->mapping.features = 0;
869 rbd_dev->mapping.read_only = true;
872 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
878 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
881 segment = offset >> rbd_dev->header.obj_order;
882 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
883 rbd_dev->header.object_prefix, segment);
884 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
885 pr_err("error formatting segment name for #%llu (%d)\n",
894 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
896 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
898 return offset & (segment_size - 1);
901 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
902 u64 offset, u64 length)
904 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
906 offset &= segment_size - 1;
908 rbd_assert(length <= U64_MAX - offset);
909 if (offset + length > segment_size)
910 length = segment_size - offset;
916 * returns the size of an object in the image
918 static u64 rbd_obj_bytes(struct rbd_image_header *header)
920 return 1 << header->obj_order;
927 static void bio_chain_put(struct bio *chain)
933 chain = chain->bi_next;
939 * zeros a bio chain, starting at specific offset
941 static void zero_bio_chain(struct bio *chain, int start_ofs)
950 bio_for_each_segment(bv, chain, i) {
951 if (pos + bv->bv_len > start_ofs) {
952 int remainder = max(start_ofs - pos, 0);
953 buf = bvec_kmap_irq(bv, &flags);
954 memset(buf + remainder, 0,
955 bv->bv_len - remainder);
956 bvec_kunmap_irq(buf, &flags);
961 chain = chain->bi_next;
966 * similar to zero_bio_chain(), zeros data defined by a page array,
967 * starting at the given byte offset from the start of the array and
968 * continuing up to the given end offset. The pages array is
969 * assumed to be big enough to hold all bytes up to the end.
971 static void zero_pages(struct page **pages, u64 offset, u64 end)
973 struct page **page = &pages[offset >> PAGE_SHIFT];
975 rbd_assert(end > offset);
976 rbd_assert(end - offset <= (u64)SIZE_MAX);
977 while (offset < end) {
983 page_offset = (size_t)(offset & ~PAGE_MASK);
984 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
985 local_irq_save(flags);
986 kaddr = kmap_atomic(*page);
987 memset(kaddr + page_offset, 0, length);
988 kunmap_atomic(kaddr);
989 local_irq_restore(flags);
997 * Clone a portion of a bio, starting at the given byte offset
998 * and continuing for the number of bytes indicated.
1000 static struct bio *bio_clone_range(struct bio *bio_src,
1001 unsigned int offset,
1009 unsigned short end_idx;
1010 unsigned short vcnt;
1013 /* Handle the easy case for the caller */
1015 if (!offset && len == bio_src->bi_size)
1016 return bio_clone(bio_src, gfpmask);
1018 if (WARN_ON_ONCE(!len))
1020 if (WARN_ON_ONCE(len > bio_src->bi_size))
1022 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1025 /* Find first affected segment... */
1028 __bio_for_each_segment(bv, bio_src, idx, 0) {
1029 if (resid < bv->bv_len)
1031 resid -= bv->bv_len;
1035 /* ...and the last affected segment */
1038 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1039 if (resid <= bv->bv_len)
1041 resid -= bv->bv_len;
1043 vcnt = end_idx - idx + 1;
1045 /* Build the clone */
1047 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1049 return NULL; /* ENOMEM */
1051 bio->bi_bdev = bio_src->bi_bdev;
1052 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1053 bio->bi_rw = bio_src->bi_rw;
1054 bio->bi_flags |= 1 << BIO_CLONED;
1057 * Copy over our part of the bio_vec, then update the first
1058 * and last (or only) entries.
1060 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1061 vcnt * sizeof (struct bio_vec));
1062 bio->bi_io_vec[0].bv_offset += voff;
1064 bio->bi_io_vec[0].bv_len -= voff;
1065 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1067 bio->bi_io_vec[0].bv_len = len;
1070 bio->bi_vcnt = vcnt;
1078 * Clone a portion of a bio chain, starting at the given byte offset
1079 * into the first bio in the source chain and continuing for the
1080 * number of bytes indicated. The result is another bio chain of
1081 * exactly the given length, or a null pointer on error.
1083 * The bio_src and offset parameters are both in-out. On entry they
1084 * refer to the first source bio and the offset into that bio where
1085 * the start of data to be cloned is located.
1087 * On return, bio_src is updated to refer to the bio in the source
1088 * chain that contains first un-cloned byte, and *offset will
1089 * contain the offset of that byte within that bio.
1091 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1092 unsigned int *offset,
1096 struct bio *bi = *bio_src;
1097 unsigned int off = *offset;
1098 struct bio *chain = NULL;
1101 /* Build up a chain of clone bios up to the limit */
1103 if (!bi || off >= bi->bi_size || !len)
1104 return NULL; /* Nothing to clone */
1108 unsigned int bi_size;
1112 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1113 goto out_err; /* EINVAL; ran out of bio's */
1115 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1116 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1118 goto out_err; /* ENOMEM */
1121 end = &bio->bi_next;
1124 if (off == bi->bi_size) {
1135 bio_chain_put(chain);
1141 * The default/initial value for all object request flags is 0. For
1142 * each flag, once its value is set to 1 it is never reset to 0
1145 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1147 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1148 struct rbd_device *rbd_dev;
1150 rbd_dev = obj_request->img_request->rbd_dev;
1151 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1156 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1159 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1162 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1164 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1165 struct rbd_device *rbd_dev = NULL;
1167 if (obj_request_img_data_test(obj_request))
1168 rbd_dev = obj_request->img_request->rbd_dev;
1169 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1174 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1177 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1181 * This sets the KNOWN flag after (possibly) setting the EXISTS
1182 * flag. The latter is set based on the "exists" value provided.
1184 * Note that for our purposes once an object exists it never goes
1185 * away again. It's possible that the response from two existence
1186 * checks are separated by the creation of the target object, and
1187 * the first ("doesn't exist") response arrives *after* the second
1188 * ("does exist"). In that case we ignore the second one.
1190 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1194 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1195 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1199 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1202 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1205 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1208 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1211 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1213 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1214 atomic_read(&obj_request->kref.refcount));
1215 kref_get(&obj_request->kref);
1218 static void rbd_obj_request_destroy(struct kref *kref);
1219 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1221 rbd_assert(obj_request != NULL);
1222 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1223 atomic_read(&obj_request->kref.refcount));
1224 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1227 static void rbd_img_request_get(struct rbd_img_request *img_request)
1229 dout("%s: img %p (was %d)\n", __func__, img_request,
1230 atomic_read(&img_request->kref.refcount));
1231 kref_get(&img_request->kref);
1234 static void rbd_img_request_destroy(struct kref *kref);
1235 static void rbd_img_request_put(struct rbd_img_request *img_request)
1237 rbd_assert(img_request != NULL);
1238 dout("%s: img %p (was %d)\n", __func__, img_request,
1239 atomic_read(&img_request->kref.refcount));
1240 kref_put(&img_request->kref, rbd_img_request_destroy);
1243 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1244 struct rbd_obj_request *obj_request)
1246 rbd_assert(obj_request->img_request == NULL);
1248 /* Image request now owns object's original reference */
1249 obj_request->img_request = img_request;
1250 obj_request->which = img_request->obj_request_count;
1251 rbd_assert(!obj_request_img_data_test(obj_request));
1252 obj_request_img_data_set(obj_request);
1253 rbd_assert(obj_request->which != BAD_WHICH);
1254 img_request->obj_request_count++;
1255 list_add_tail(&obj_request->links, &img_request->obj_requests);
1256 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1257 obj_request->which);
1260 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1261 struct rbd_obj_request *obj_request)
1263 rbd_assert(obj_request->which != BAD_WHICH);
1265 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1266 obj_request->which);
1267 list_del(&obj_request->links);
1268 rbd_assert(img_request->obj_request_count > 0);
1269 img_request->obj_request_count--;
1270 rbd_assert(obj_request->which == img_request->obj_request_count);
1271 obj_request->which = BAD_WHICH;
1272 rbd_assert(obj_request_img_data_test(obj_request));
1273 rbd_assert(obj_request->img_request == img_request);
1274 obj_request->img_request = NULL;
1275 obj_request->callback = NULL;
1276 rbd_obj_request_put(obj_request);
1279 static bool obj_request_type_valid(enum obj_request_type type)
1282 case OBJ_REQUEST_NODATA:
1283 case OBJ_REQUEST_BIO:
1284 case OBJ_REQUEST_PAGES:
1291 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1292 struct rbd_obj_request *obj_request)
1294 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1296 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1299 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1302 dout("%s: img %p\n", __func__, img_request);
1305 * If no error occurred, compute the aggregate transfer
1306 * count for the image request. We could instead use
1307 * atomic64_cmpxchg() to update it as each object request
1308 * completes; not clear which way is better off hand.
1310 if (!img_request->result) {
1311 struct rbd_obj_request *obj_request;
1314 for_each_obj_request(img_request, obj_request)
1315 xferred += obj_request->xferred;
1316 img_request->xferred = xferred;
1319 if (img_request->callback)
1320 img_request->callback(img_request);
1322 rbd_img_request_put(img_request);
1325 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1327 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1329 dout("%s: obj %p\n", __func__, obj_request);
1331 return wait_for_completion_interruptible(&obj_request->completion);
1335 * The default/initial value for all image request flags is 0. Each
1336 * is conditionally set to 1 at image request initialization time
1337 * and currently never change thereafter.
1339 static void img_request_write_set(struct rbd_img_request *img_request)
1341 set_bit(IMG_REQ_WRITE, &img_request->flags);
1345 static bool img_request_write_test(struct rbd_img_request *img_request)
1348 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1351 static void img_request_child_set(struct rbd_img_request *img_request)
1353 set_bit(IMG_REQ_CHILD, &img_request->flags);
1357 static bool img_request_child_test(struct rbd_img_request *img_request)
1360 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1363 static void img_request_layered_set(struct rbd_img_request *img_request)
1365 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1369 static bool img_request_layered_test(struct rbd_img_request *img_request)
1372 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1376 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1378 u64 xferred = obj_request->xferred;
1379 u64 length = obj_request->length;
1381 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1382 obj_request, obj_request->img_request, obj_request->result,
1385 * ENOENT means a hole in the image. We zero-fill the
1386 * entire length of the request. A short read also implies
1387 * zero-fill to the end of the request. Either way we
1388 * update the xferred count to indicate the whole request
1391 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1392 if (obj_request->result == -ENOENT) {
1393 if (obj_request->type == OBJ_REQUEST_BIO)
1394 zero_bio_chain(obj_request->bio_list, 0);
1396 zero_pages(obj_request->pages, 0, length);
1397 obj_request->result = 0;
1398 obj_request->xferred = length;
1399 } else if (xferred < length && !obj_request->result) {
1400 if (obj_request->type == OBJ_REQUEST_BIO)
1401 zero_bio_chain(obj_request->bio_list, xferred);
1403 zero_pages(obj_request->pages, xferred, length);
1404 obj_request->xferred = length;
1406 obj_request_done_set(obj_request);
1409 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1411 dout("%s: obj %p cb %p\n", __func__, obj_request,
1412 obj_request->callback);
1413 if (obj_request->callback)
1414 obj_request->callback(obj_request);
1416 complete_all(&obj_request->completion);
1419 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1421 dout("%s: obj %p\n", __func__, obj_request);
1422 obj_request_done_set(obj_request);
1425 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1427 struct rbd_img_request *img_request = NULL;
1428 struct rbd_device *rbd_dev = NULL;
1429 bool layered = false;
1431 if (obj_request_img_data_test(obj_request)) {
1432 img_request = obj_request->img_request;
1433 layered = img_request && img_request_layered_test(img_request);
1434 rbd_dev = img_request->rbd_dev;
1437 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1438 obj_request, img_request, obj_request->result,
1439 obj_request->xferred, obj_request->length);
1440 if (layered && obj_request->result == -ENOENT &&
1441 obj_request->img_offset < rbd_dev->parent_overlap)
1442 rbd_img_parent_read(obj_request);
1443 else if (img_request)
1444 rbd_img_obj_request_read_callback(obj_request);
1446 obj_request_done_set(obj_request);
1449 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1451 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1452 obj_request->result, obj_request->length);
1454 * There is no such thing as a successful short write. Set
1455 * it to our originally-requested length.
1457 obj_request->xferred = obj_request->length;
1458 obj_request_done_set(obj_request);
1462 * For a simple stat call there's nothing to do. We'll do more if
1463 * this is part of a write sequence for a layered image.
1465 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1467 dout("%s: obj %p\n", __func__, obj_request);
1468 obj_request_done_set(obj_request);
1471 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1472 struct ceph_msg *msg)
1474 struct rbd_obj_request *obj_request = osd_req->r_priv;
1477 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1478 rbd_assert(osd_req == obj_request->osd_req);
1479 if (obj_request_img_data_test(obj_request)) {
1480 rbd_assert(obj_request->img_request);
1481 rbd_assert(obj_request->which != BAD_WHICH);
1483 rbd_assert(obj_request->which == BAD_WHICH);
1486 if (osd_req->r_result < 0)
1487 obj_request->result = osd_req->r_result;
1488 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1490 BUG_ON(osd_req->r_num_ops > 2);
1493 * We support a 64-bit length, but ultimately it has to be
1494 * passed to blk_end_request(), which takes an unsigned int.
1496 obj_request->xferred = osd_req->r_reply_op_len[0];
1497 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1498 opcode = osd_req->r_ops[0].op;
1500 case CEPH_OSD_OP_READ:
1501 rbd_osd_read_callback(obj_request);
1503 case CEPH_OSD_OP_WRITE:
1504 rbd_osd_write_callback(obj_request);
1506 case CEPH_OSD_OP_STAT:
1507 rbd_osd_stat_callback(obj_request);
1509 case CEPH_OSD_OP_CALL:
1510 case CEPH_OSD_OP_NOTIFY_ACK:
1511 case CEPH_OSD_OP_WATCH:
1512 rbd_osd_trivial_callback(obj_request);
1515 rbd_warn(NULL, "%s: unsupported op %hu\n",
1516 obj_request->object_name, (unsigned short) opcode);
1520 if (obj_request_done_test(obj_request))
1521 rbd_obj_request_complete(obj_request);
1524 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1526 struct rbd_img_request *img_request = obj_request->img_request;
1527 struct ceph_osd_request *osd_req = obj_request->osd_req;
1530 rbd_assert(osd_req != NULL);
1532 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1533 ceph_osdc_build_request(osd_req, obj_request->offset,
1534 NULL, snap_id, NULL);
1537 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1539 struct rbd_img_request *img_request = obj_request->img_request;
1540 struct ceph_osd_request *osd_req = obj_request->osd_req;
1541 struct ceph_snap_context *snapc;
1542 struct timespec mtime = CURRENT_TIME;
1544 rbd_assert(osd_req != NULL);
1546 snapc = img_request ? img_request->snapc : NULL;
1547 ceph_osdc_build_request(osd_req, obj_request->offset,
1548 snapc, CEPH_NOSNAP, &mtime);
1551 static struct ceph_osd_request *rbd_osd_req_create(
1552 struct rbd_device *rbd_dev,
1554 struct rbd_obj_request *obj_request)
1556 struct ceph_snap_context *snapc = NULL;
1557 struct ceph_osd_client *osdc;
1558 struct ceph_osd_request *osd_req;
1560 if (obj_request_img_data_test(obj_request)) {
1561 struct rbd_img_request *img_request = obj_request->img_request;
1563 rbd_assert(write_request ==
1564 img_request_write_test(img_request));
1566 snapc = img_request->snapc;
1569 /* Allocate and initialize the request, for the single op */
1571 osdc = &rbd_dev->rbd_client->client->osdc;
1572 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1574 return NULL; /* ENOMEM */
1577 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1579 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1581 osd_req->r_callback = rbd_osd_req_callback;
1582 osd_req->r_priv = obj_request;
1584 osd_req->r_oid_len = strlen(obj_request->object_name);
1585 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1586 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1588 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1594 * Create a copyup osd request based on the information in the
1595 * object request supplied. A copyup request has two osd ops,
1596 * a copyup method call, and a "normal" write request.
1598 static struct ceph_osd_request *
1599 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1601 struct rbd_img_request *img_request;
1602 struct ceph_snap_context *snapc;
1603 struct rbd_device *rbd_dev;
1604 struct ceph_osd_client *osdc;
1605 struct ceph_osd_request *osd_req;
1607 rbd_assert(obj_request_img_data_test(obj_request));
1608 img_request = obj_request->img_request;
1609 rbd_assert(img_request);
1610 rbd_assert(img_request_write_test(img_request));
1612 /* Allocate and initialize the request, for the two ops */
1614 snapc = img_request->snapc;
1615 rbd_dev = img_request->rbd_dev;
1616 osdc = &rbd_dev->rbd_client->client->osdc;
1617 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1619 return NULL; /* ENOMEM */
1621 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1622 osd_req->r_callback = rbd_osd_req_callback;
1623 osd_req->r_priv = obj_request;
1625 osd_req->r_oid_len = strlen(obj_request->object_name);
1626 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1627 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1629 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1635 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1637 ceph_osdc_put_request(osd_req);
1640 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1642 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1643 u64 offset, u64 length,
1644 enum obj_request_type type)
1646 struct rbd_obj_request *obj_request;
1650 rbd_assert(obj_request_type_valid(type));
1652 size = strlen(object_name) + 1;
1653 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1657 name = (char *)(obj_request + 1);
1658 obj_request->object_name = memcpy(name, object_name, size);
1659 obj_request->offset = offset;
1660 obj_request->length = length;
1661 obj_request->flags = 0;
1662 obj_request->which = BAD_WHICH;
1663 obj_request->type = type;
1664 INIT_LIST_HEAD(&obj_request->links);
1665 init_completion(&obj_request->completion);
1666 kref_init(&obj_request->kref);
1668 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1669 offset, length, (int)type, obj_request);
1674 static void rbd_obj_request_destroy(struct kref *kref)
1676 struct rbd_obj_request *obj_request;
1678 obj_request = container_of(kref, struct rbd_obj_request, kref);
1680 dout("%s: obj %p\n", __func__, obj_request);
1682 rbd_assert(obj_request->img_request == NULL);
1683 rbd_assert(obj_request->which == BAD_WHICH);
1685 if (obj_request->osd_req)
1686 rbd_osd_req_destroy(obj_request->osd_req);
1688 rbd_assert(obj_request_type_valid(obj_request->type));
1689 switch (obj_request->type) {
1690 case OBJ_REQUEST_NODATA:
1691 break; /* Nothing to do */
1692 case OBJ_REQUEST_BIO:
1693 if (obj_request->bio_list)
1694 bio_chain_put(obj_request->bio_list);
1696 case OBJ_REQUEST_PAGES:
1697 if (obj_request->pages)
1698 ceph_release_page_vector(obj_request->pages,
1699 obj_request->page_count);
1707 * Caller is responsible for filling in the list of object requests
1708 * that comprises the image request, and the Linux request pointer
1709 * (if there is one).
1711 static struct rbd_img_request *rbd_img_request_create(
1712 struct rbd_device *rbd_dev,
1713 u64 offset, u64 length,
1717 struct rbd_img_request *img_request;
1719 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1723 if (write_request) {
1724 down_read(&rbd_dev->header_rwsem);
1725 ceph_get_snap_context(rbd_dev->header.snapc);
1726 up_read(&rbd_dev->header_rwsem);
1729 img_request->rq = NULL;
1730 img_request->rbd_dev = rbd_dev;
1731 img_request->offset = offset;
1732 img_request->length = length;
1733 img_request->flags = 0;
1734 if (write_request) {
1735 img_request_write_set(img_request);
1736 img_request->snapc = rbd_dev->header.snapc;
1738 img_request->snap_id = rbd_dev->spec->snap_id;
1741 img_request_child_set(img_request);
1742 if (rbd_dev->parent_spec)
1743 img_request_layered_set(img_request);
1744 spin_lock_init(&img_request->completion_lock);
1745 img_request->next_completion = 0;
1746 img_request->callback = NULL;
1747 img_request->result = 0;
1748 img_request->obj_request_count = 0;
1749 INIT_LIST_HEAD(&img_request->obj_requests);
1750 kref_init(&img_request->kref);
1752 rbd_img_request_get(img_request); /* Avoid a warning */
1753 rbd_img_request_put(img_request); /* TEMPORARY */
1755 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1756 write_request ? "write" : "read", offset, length,
1762 static void rbd_img_request_destroy(struct kref *kref)
1764 struct rbd_img_request *img_request;
1765 struct rbd_obj_request *obj_request;
1766 struct rbd_obj_request *next_obj_request;
1768 img_request = container_of(kref, struct rbd_img_request, kref);
1770 dout("%s: img %p\n", __func__, img_request);
1772 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1773 rbd_img_obj_request_del(img_request, obj_request);
1774 rbd_assert(img_request->obj_request_count == 0);
1776 if (img_request_write_test(img_request))
1777 ceph_put_snap_context(img_request->snapc);
1779 if (img_request_child_test(img_request))
1780 rbd_obj_request_put(img_request->obj_request);
1785 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1787 struct rbd_img_request *img_request;
1788 unsigned int xferred;
1792 rbd_assert(obj_request_img_data_test(obj_request));
1793 img_request = obj_request->img_request;
1795 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1796 xferred = (unsigned int)obj_request->xferred;
1797 result = obj_request->result;
1799 struct rbd_device *rbd_dev = img_request->rbd_dev;
1801 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1802 img_request_write_test(img_request) ? "write" : "read",
1803 obj_request->length, obj_request->img_offset,
1804 obj_request->offset);
1805 rbd_warn(rbd_dev, " result %d xferred %x\n",
1807 if (!img_request->result)
1808 img_request->result = result;
1811 /* Image object requests don't own their page array */
1813 if (obj_request->type == OBJ_REQUEST_PAGES) {
1814 obj_request->pages = NULL;
1815 obj_request->page_count = 0;
1818 if (img_request_child_test(img_request)) {
1819 rbd_assert(img_request->obj_request != NULL);
1820 more = obj_request->which < img_request->obj_request_count - 1;
1822 rbd_assert(img_request->rq != NULL);
1823 more = blk_end_request(img_request->rq, result, xferred);
1829 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1831 struct rbd_img_request *img_request;
1832 u32 which = obj_request->which;
1835 rbd_assert(obj_request_img_data_test(obj_request));
1836 img_request = obj_request->img_request;
1838 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1839 rbd_assert(img_request != NULL);
1840 rbd_assert(img_request->obj_request_count > 0);
1841 rbd_assert(which != BAD_WHICH);
1842 rbd_assert(which < img_request->obj_request_count);
1843 rbd_assert(which >= img_request->next_completion);
1845 spin_lock_irq(&img_request->completion_lock);
1846 if (which != img_request->next_completion)
1849 for_each_obj_request_from(img_request, obj_request) {
1851 rbd_assert(which < img_request->obj_request_count);
1853 if (!obj_request_done_test(obj_request))
1855 more = rbd_img_obj_end_request(obj_request);
1859 rbd_assert(more ^ (which == img_request->obj_request_count));
1860 img_request->next_completion = which;
1862 spin_unlock_irq(&img_request->completion_lock);
1865 rbd_img_request_complete(img_request);
1869 * Split up an image request into one or more object requests, each
1870 * to a different object. The "type" parameter indicates whether
1871 * "data_desc" is the pointer to the head of a list of bio
1872 * structures, or the base of a page array. In either case this
1873 * function assumes data_desc describes memory sufficient to hold
1874 * all data described by the image request.
1876 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1877 enum obj_request_type type,
1880 struct rbd_device *rbd_dev = img_request->rbd_dev;
1881 struct rbd_obj_request *obj_request = NULL;
1882 struct rbd_obj_request *next_obj_request;
1883 bool write_request = img_request_write_test(img_request);
1884 struct bio *bio_list;
1885 unsigned int bio_offset = 0;
1886 struct page **pages;
1891 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1892 (int)type, data_desc);
1894 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1895 img_offset = img_request->offset;
1896 resid = img_request->length;
1897 rbd_assert(resid > 0);
1899 if (type == OBJ_REQUEST_BIO) {
1900 bio_list = data_desc;
1901 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1903 rbd_assert(type == OBJ_REQUEST_PAGES);
1908 struct ceph_osd_request *osd_req;
1909 const char *object_name;
1913 object_name = rbd_segment_name(rbd_dev, img_offset);
1916 offset = rbd_segment_offset(rbd_dev, img_offset);
1917 length = rbd_segment_length(rbd_dev, img_offset, resid);
1918 obj_request = rbd_obj_request_create(object_name,
1919 offset, length, type);
1920 kfree(object_name); /* object request has its own copy */
1924 if (type == OBJ_REQUEST_BIO) {
1925 unsigned int clone_size;
1927 rbd_assert(length <= (u64)UINT_MAX);
1928 clone_size = (unsigned int)length;
1929 obj_request->bio_list =
1930 bio_chain_clone_range(&bio_list,
1934 if (!obj_request->bio_list)
1937 unsigned int page_count;
1939 obj_request->pages = pages;
1940 page_count = (u32)calc_pages_for(offset, length);
1941 obj_request->page_count = page_count;
1942 if ((offset + length) & ~PAGE_MASK)
1943 page_count--; /* more on last page */
1944 pages += page_count;
1947 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1951 obj_request->osd_req = osd_req;
1952 obj_request->callback = rbd_img_obj_callback;
1954 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1956 if (type == OBJ_REQUEST_BIO)
1957 osd_req_op_extent_osd_data_bio(osd_req, 0,
1958 obj_request->bio_list, length);
1960 osd_req_op_extent_osd_data_pages(osd_req, 0,
1961 obj_request->pages, length,
1962 offset & ~PAGE_MASK, false, false);
1965 rbd_osd_req_format_write(obj_request);
1967 rbd_osd_req_format_read(obj_request);
1969 obj_request->img_offset = img_offset;
1970 rbd_img_obj_request_add(img_request, obj_request);
1972 img_offset += length;
1979 rbd_obj_request_put(obj_request);
1981 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1982 rbd_obj_request_put(obj_request);
1988 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
1990 struct rbd_img_request *img_request;
1991 struct rbd_device *rbd_dev;
1995 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
1996 rbd_assert(obj_request_img_data_test(obj_request));
1997 img_request = obj_request->img_request;
1998 rbd_assert(img_request);
2000 rbd_dev = img_request->rbd_dev;
2001 rbd_assert(rbd_dev);
2002 length = (u64)1 << rbd_dev->header.obj_order;
2003 page_count = (u32)calc_pages_for(0, length);
2005 rbd_assert(obj_request->copyup_pages);
2006 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2007 obj_request->copyup_pages = NULL;
2010 * We want the transfer count to reflect the size of the
2011 * original write request. There is no such thing as a
2012 * successful short write, so if the request was successful
2013 * we can just set it to the originally-requested length.
2015 if (!obj_request->result)
2016 obj_request->xferred = obj_request->length;
2018 /* Finish up with the normal image object callback */
2020 rbd_img_obj_callback(obj_request);
2024 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2026 struct rbd_obj_request *orig_request;
2027 struct ceph_osd_request *osd_req;
2028 struct ceph_osd_client *osdc;
2029 struct rbd_device *rbd_dev;
2030 struct page **pages;
2035 rbd_assert(img_request_child_test(img_request));
2037 /* First get what we need from the image request */
2039 pages = img_request->copyup_pages;
2040 rbd_assert(pages != NULL);
2041 img_request->copyup_pages = NULL;
2043 orig_request = img_request->obj_request;
2044 rbd_assert(orig_request != NULL);
2045 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2046 result = img_request->result;
2047 obj_size = img_request->length;
2048 xferred = img_request->xferred;
2050 rbd_dev = img_request->rbd_dev;
2051 rbd_assert(rbd_dev);
2052 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2054 rbd_img_request_put(img_request);
2059 /* Allocate the new copyup osd request for the original request */
2062 rbd_assert(!orig_request->osd_req);
2063 osd_req = rbd_osd_req_create_copyup(orig_request);
2066 orig_request->osd_req = osd_req;
2067 orig_request->copyup_pages = pages;
2069 /* Initialize the copyup op */
2071 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2072 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2075 /* Then the original write request op */
2077 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2078 orig_request->offset,
2079 orig_request->length, 0, 0);
2080 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2081 orig_request->length);
2083 rbd_osd_req_format_write(orig_request);
2085 /* All set, send it off. */
2087 orig_request->callback = rbd_img_obj_copyup_callback;
2088 osdc = &rbd_dev->rbd_client->client->osdc;
2089 result = rbd_obj_request_submit(osdc, orig_request);
2093 /* Record the error code and complete the request */
2095 orig_request->result = result;
2096 orig_request->xferred = 0;
2097 obj_request_done_set(orig_request);
2098 rbd_obj_request_complete(orig_request);
2102 * Read from the parent image the range of data that covers the
2103 * entire target of the given object request. This is used for
2104 * satisfying a layered image write request when the target of an
2105 * object request from the image request does not exist.
2107 * A page array big enough to hold the returned data is allocated
2108 * and supplied to rbd_img_request_fill() as the "data descriptor."
2109 * When the read completes, this page array will be transferred to
2110 * the original object request for the copyup operation.
2112 * If an error occurs, record it as the result of the original
2113 * object request and mark it done so it gets completed.
2115 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2117 struct rbd_img_request *img_request = NULL;
2118 struct rbd_img_request *parent_request = NULL;
2119 struct rbd_device *rbd_dev;
2122 struct page **pages = NULL;
2126 rbd_assert(obj_request_img_data_test(obj_request));
2127 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2129 img_request = obj_request->img_request;
2130 rbd_assert(img_request != NULL);
2131 rbd_dev = img_request->rbd_dev;
2132 rbd_assert(rbd_dev->parent != NULL);
2135 * First things first. The original osd request is of no
2136 * use to use any more, we'll need a new one that can hold
2137 * the two ops in a copyup request. We'll get that later,
2138 * but for now we can release the old one.
2140 rbd_osd_req_destroy(obj_request->osd_req);
2141 obj_request->osd_req = NULL;
2144 * Determine the byte range covered by the object in the
2145 * child image to which the original request was to be sent.
2147 img_offset = obj_request->img_offset - obj_request->offset;
2148 length = (u64)1 << rbd_dev->header.obj_order;
2151 * There is no defined parent data beyond the parent
2152 * overlap, so limit what we read at that boundary if
2155 if (img_offset + length > rbd_dev->parent_overlap) {
2156 rbd_assert(img_offset < rbd_dev->parent_overlap);
2157 length = rbd_dev->parent_overlap - img_offset;
2161 * Allocate a page array big enough to receive the data read
2164 page_count = (u32)calc_pages_for(0, length);
2165 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2166 if (IS_ERR(pages)) {
2167 result = PTR_ERR(pages);
2173 parent_request = rbd_img_request_create(rbd_dev->parent,
2176 if (!parent_request)
2178 rbd_obj_request_get(obj_request);
2179 parent_request->obj_request = obj_request;
2181 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2184 parent_request->copyup_pages = pages;
2186 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2187 result = rbd_img_request_submit(parent_request);
2191 parent_request->copyup_pages = NULL;
2192 parent_request->obj_request = NULL;
2193 rbd_obj_request_put(obj_request);
2196 ceph_release_page_vector(pages, page_count);
2198 rbd_img_request_put(parent_request);
2199 obj_request->result = result;
2200 obj_request->xferred = 0;
2201 obj_request_done_set(obj_request);
2206 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2208 struct rbd_obj_request *orig_request;
2211 rbd_assert(!obj_request_img_data_test(obj_request));
2214 * All we need from the object request is the original
2215 * request and the result of the STAT op. Grab those, then
2216 * we're done with the request.
2218 orig_request = obj_request->obj_request;
2219 obj_request->obj_request = NULL;
2220 rbd_assert(orig_request);
2221 rbd_assert(orig_request->img_request);
2223 result = obj_request->result;
2224 obj_request->result = 0;
2226 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2227 obj_request, orig_request, result,
2228 obj_request->xferred, obj_request->length);
2229 rbd_obj_request_put(obj_request);
2231 rbd_assert(orig_request);
2232 rbd_assert(orig_request->img_request);
2235 * Our only purpose here is to determine whether the object
2236 * exists, and we don't want to treat the non-existence as
2237 * an error. If something else comes back, transfer the
2238 * error to the original request and complete it now.
2241 obj_request_existence_set(orig_request, true);
2242 } else if (result == -ENOENT) {
2243 obj_request_existence_set(orig_request, false);
2244 } else if (result) {
2245 orig_request->result = result;
2250 * Resubmit the original request now that we have recorded
2251 * whether the target object exists.
2253 orig_request->result = rbd_img_obj_request_submit(orig_request);
2255 if (orig_request->result)
2256 rbd_obj_request_complete(orig_request);
2257 rbd_obj_request_put(orig_request);
2260 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2262 struct rbd_obj_request *stat_request;
2263 struct rbd_device *rbd_dev;
2264 struct ceph_osd_client *osdc;
2265 struct page **pages = NULL;
2271 * The response data for a STAT call consists of:
2278 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2279 page_count = (u32)calc_pages_for(0, size);
2280 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2282 return PTR_ERR(pages);
2285 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2290 rbd_obj_request_get(obj_request);
2291 stat_request->obj_request = obj_request;
2292 stat_request->pages = pages;
2293 stat_request->page_count = page_count;
2295 rbd_assert(obj_request->img_request);
2296 rbd_dev = obj_request->img_request->rbd_dev;
2297 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2299 if (!stat_request->osd_req)
2301 stat_request->callback = rbd_img_obj_exists_callback;
2303 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2304 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2306 rbd_osd_req_format_read(stat_request);
2308 osdc = &rbd_dev->rbd_client->client->osdc;
2309 ret = rbd_obj_request_submit(osdc, stat_request);
2312 rbd_obj_request_put(obj_request);
2317 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2319 struct rbd_img_request *img_request;
2320 struct rbd_device *rbd_dev;
2323 rbd_assert(obj_request_img_data_test(obj_request));
2325 img_request = obj_request->img_request;
2326 rbd_assert(img_request);
2327 rbd_dev = img_request->rbd_dev;
2330 * Only writes to layered images need special handling.
2331 * Reads and non-layered writes are simple object requests.
2332 * Layered writes that start beyond the end of the overlap
2333 * with the parent have no parent data, so they too are
2334 * simple object requests. Finally, if the target object is
2335 * known to already exist, its parent data has already been
2336 * copied, so a write to the object can also be handled as a
2337 * simple object request.
2339 if (!img_request_write_test(img_request) ||
2340 !img_request_layered_test(img_request) ||
2341 rbd_dev->parent_overlap <= obj_request->img_offset ||
2342 ((known = obj_request_known_test(obj_request)) &&
2343 obj_request_exists_test(obj_request))) {
2345 struct rbd_device *rbd_dev;
2346 struct ceph_osd_client *osdc;
2348 rbd_dev = obj_request->img_request->rbd_dev;
2349 osdc = &rbd_dev->rbd_client->client->osdc;
2351 return rbd_obj_request_submit(osdc, obj_request);
2355 * It's a layered write. The target object might exist but
2356 * we may not know that yet. If we know it doesn't exist,
2357 * start by reading the data for the full target object from
2358 * the parent so we can use it for a copyup to the target.
2361 return rbd_img_obj_parent_read_full(obj_request);
2363 /* We don't know whether the target exists. Go find out. */
2365 return rbd_img_obj_exists_submit(obj_request);
2368 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2370 struct rbd_obj_request *obj_request;
2371 struct rbd_obj_request *next_obj_request;
2373 dout("%s: img %p\n", __func__, img_request);
2374 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2377 ret = rbd_img_obj_request_submit(obj_request);
2385 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2387 struct rbd_obj_request *obj_request;
2388 struct rbd_device *rbd_dev;
2391 rbd_assert(img_request_child_test(img_request));
2393 obj_request = img_request->obj_request;
2394 rbd_assert(obj_request);
2395 rbd_assert(obj_request->img_request);
2397 obj_request->result = img_request->result;
2398 if (obj_request->result)
2402 * We need to zero anything beyond the parent overlap
2403 * boundary. Since rbd_img_obj_request_read_callback()
2404 * will zero anything beyond the end of a short read, an
2405 * easy way to do this is to pretend the data from the
2406 * parent came up short--ending at the overlap boundary.
2408 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2409 obj_end = obj_request->img_offset + obj_request->length;
2410 rbd_dev = obj_request->img_request->rbd_dev;
2411 if (obj_end > rbd_dev->parent_overlap) {
2414 if (obj_request->img_offset < rbd_dev->parent_overlap)
2415 xferred = rbd_dev->parent_overlap -
2416 obj_request->img_offset;
2418 obj_request->xferred = min(img_request->xferred, xferred);
2420 obj_request->xferred = img_request->xferred;
2423 rbd_img_obj_request_read_callback(obj_request);
2424 rbd_obj_request_complete(obj_request);
2427 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2429 struct rbd_device *rbd_dev;
2430 struct rbd_img_request *img_request;
2433 rbd_assert(obj_request_img_data_test(obj_request));
2434 rbd_assert(obj_request->img_request != NULL);
2435 rbd_assert(obj_request->result == (s32) -ENOENT);
2436 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2438 rbd_dev = obj_request->img_request->rbd_dev;
2439 rbd_assert(rbd_dev->parent != NULL);
2440 /* rbd_read_finish(obj_request, obj_request->length); */
2441 img_request = rbd_img_request_create(rbd_dev->parent,
2442 obj_request->img_offset,
2443 obj_request->length,
2449 rbd_obj_request_get(obj_request);
2450 img_request->obj_request = obj_request;
2452 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2453 obj_request->bio_list);
2457 img_request->callback = rbd_img_parent_read_callback;
2458 result = rbd_img_request_submit(img_request);
2465 rbd_img_request_put(img_request);
2466 obj_request->result = result;
2467 obj_request->xferred = 0;
2468 obj_request_done_set(obj_request);
2471 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2473 struct rbd_obj_request *obj_request;
2474 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2477 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2478 OBJ_REQUEST_NODATA);
2483 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2484 if (!obj_request->osd_req)
2486 obj_request->callback = rbd_obj_request_put;
2488 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2490 rbd_osd_req_format_read(obj_request);
2492 ret = rbd_obj_request_submit(osdc, obj_request);
2495 rbd_obj_request_put(obj_request);
2500 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2502 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2507 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2508 rbd_dev->header_name, (unsigned long long)notify_id,
2509 (unsigned int)opcode);
2510 (void)rbd_dev_refresh(rbd_dev);
2512 rbd_obj_notify_ack(rbd_dev, notify_id);
2516 * Request sync osd watch/unwatch. The value of "start" determines
2517 * whether a watch request is being initiated or torn down.
2519 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2521 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2522 struct rbd_obj_request *obj_request;
2525 rbd_assert(start ^ !!rbd_dev->watch_event);
2526 rbd_assert(start ^ !!rbd_dev->watch_request);
2529 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2530 &rbd_dev->watch_event);
2533 rbd_assert(rbd_dev->watch_event != NULL);
2537 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2538 OBJ_REQUEST_NODATA);
2542 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2543 if (!obj_request->osd_req)
2547 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2549 ceph_osdc_unregister_linger_request(osdc,
2550 rbd_dev->watch_request->osd_req);
2552 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2553 rbd_dev->watch_event->cookie, 0, start);
2554 rbd_osd_req_format_write(obj_request);
2556 ret = rbd_obj_request_submit(osdc, obj_request);
2559 ret = rbd_obj_request_wait(obj_request);
2562 ret = obj_request->result;
2567 * A watch request is set to linger, so the underlying osd
2568 * request won't go away until we unregister it. We retain
2569 * a pointer to the object request during that time (in
2570 * rbd_dev->watch_request), so we'll keep a reference to
2571 * it. We'll drop that reference (below) after we've
2575 rbd_dev->watch_request = obj_request;
2580 /* We have successfully torn down the watch request */
2582 rbd_obj_request_put(rbd_dev->watch_request);
2583 rbd_dev->watch_request = NULL;
2585 /* Cancel the event if we're tearing down, or on error */
2586 ceph_osdc_cancel_event(rbd_dev->watch_event);
2587 rbd_dev->watch_event = NULL;
2589 rbd_obj_request_put(obj_request);
2595 * Synchronous osd object method call. Returns the number of bytes
2596 * returned in the outbound buffer, or a negative error code.
2598 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2599 const char *object_name,
2600 const char *class_name,
2601 const char *method_name,
2602 const void *outbound,
2603 size_t outbound_size,
2605 size_t inbound_size,
2608 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2609 struct rbd_obj_request *obj_request;
2610 struct page **pages;
2615 * Method calls are ultimately read operations. The result
2616 * should placed into the inbound buffer provided. They
2617 * also supply outbound data--parameters for the object
2618 * method. Currently if this is present it will be a
2621 page_count = (u32)calc_pages_for(0, inbound_size);
2622 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2624 return PTR_ERR(pages);
2627 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2632 obj_request->pages = pages;
2633 obj_request->page_count = page_count;
2635 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2636 if (!obj_request->osd_req)
2639 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2640 class_name, method_name);
2641 if (outbound_size) {
2642 struct ceph_pagelist *pagelist;
2644 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2648 ceph_pagelist_init(pagelist);
2649 ceph_pagelist_append(pagelist, outbound, outbound_size);
2650 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2653 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2654 obj_request->pages, inbound_size,
2656 rbd_osd_req_format_read(obj_request);
2658 ret = rbd_obj_request_submit(osdc, obj_request);
2661 ret = rbd_obj_request_wait(obj_request);
2665 ret = obj_request->result;
2669 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2670 ret = (int)obj_request->xferred;
2671 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2673 *version = obj_request->version;
2676 rbd_obj_request_put(obj_request);
2678 ceph_release_page_vector(pages, page_count);
2683 static void rbd_request_fn(struct request_queue *q)
2684 __releases(q->queue_lock) __acquires(q->queue_lock)
2686 struct rbd_device *rbd_dev = q->queuedata;
2687 bool read_only = rbd_dev->mapping.read_only;
2691 while ((rq = blk_fetch_request(q))) {
2692 bool write_request = rq_data_dir(rq) == WRITE;
2693 struct rbd_img_request *img_request;
2697 /* Ignore any non-FS requests that filter through. */
2699 if (rq->cmd_type != REQ_TYPE_FS) {
2700 dout("%s: non-fs request type %d\n", __func__,
2701 (int) rq->cmd_type);
2702 __blk_end_request_all(rq, 0);
2706 /* Ignore/skip any zero-length requests */
2708 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2709 length = (u64) blk_rq_bytes(rq);
2712 dout("%s: zero-length request\n", __func__);
2713 __blk_end_request_all(rq, 0);
2717 spin_unlock_irq(q->queue_lock);
2719 /* Disallow writes to a read-only device */
2721 if (write_request) {
2725 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2729 * Quit early if the mapped snapshot no longer
2730 * exists. It's still possible the snapshot will
2731 * have disappeared by the time our request arrives
2732 * at the osd, but there's no sense in sending it if
2735 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2736 dout("request for non-existent snapshot");
2737 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2743 if (offset && length > U64_MAX - offset + 1) {
2744 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2746 goto end_request; /* Shouldn't happen */
2750 img_request = rbd_img_request_create(rbd_dev, offset, length,
2751 write_request, false);
2755 img_request->rq = rq;
2757 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2760 result = rbd_img_request_submit(img_request);
2762 rbd_img_request_put(img_request);
2764 spin_lock_irq(q->queue_lock);
2766 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2767 write_request ? "write" : "read",
2768 length, offset, result);
2770 __blk_end_request_all(rq, result);
2776 * a queue callback. Makes sure that we don't create a bio that spans across
2777 * multiple osd objects. One exception would be with a single page bios,
2778 * which we handle later at bio_chain_clone_range()
2780 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2781 struct bio_vec *bvec)
2783 struct rbd_device *rbd_dev = q->queuedata;
2784 sector_t sector_offset;
2785 sector_t sectors_per_obj;
2786 sector_t obj_sector_offset;
2790 * Find how far into its rbd object the partition-relative
2791 * bio start sector is to offset relative to the enclosing
2794 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2795 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2796 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2799 * Compute the number of bytes from that offset to the end
2800 * of the object. Account for what's already used by the bio.
2802 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2803 if (ret > bmd->bi_size)
2804 ret -= bmd->bi_size;
2809 * Don't send back more than was asked for. And if the bio
2810 * was empty, let the whole thing through because: "Note
2811 * that a block device *must* allow a single page to be
2812 * added to an empty bio."
2814 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2815 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2816 ret = (int) bvec->bv_len;
2821 static void rbd_free_disk(struct rbd_device *rbd_dev)
2823 struct gendisk *disk = rbd_dev->disk;
2828 rbd_dev->disk = NULL;
2829 if (disk->flags & GENHD_FL_UP) {
2832 blk_cleanup_queue(disk->queue);
2837 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2838 const char *object_name,
2839 u64 offset, u64 length, void *buf)
2842 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2843 struct rbd_obj_request *obj_request;
2844 struct page **pages = NULL;
2849 page_count = (u32) calc_pages_for(offset, length);
2850 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2852 ret = PTR_ERR(pages);
2855 obj_request = rbd_obj_request_create(object_name, offset, length,
2860 obj_request->pages = pages;
2861 obj_request->page_count = page_count;
2863 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2864 if (!obj_request->osd_req)
2867 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2868 offset, length, 0, 0);
2869 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2871 obj_request->length,
2872 obj_request->offset & ~PAGE_MASK,
2874 rbd_osd_req_format_read(obj_request);
2876 ret = rbd_obj_request_submit(osdc, obj_request);
2879 ret = rbd_obj_request_wait(obj_request);
2883 ret = obj_request->result;
2887 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2888 size = (size_t) obj_request->xferred;
2889 ceph_copy_from_page_vector(pages, buf, 0, size);
2890 rbd_assert(size <= (size_t)INT_MAX);
2894 rbd_obj_request_put(obj_request);
2896 ceph_release_page_vector(pages, page_count);
2902 * Read the complete header for the given rbd device.
2904 * Returns a pointer to a dynamically-allocated buffer containing
2905 * the complete and validated header. Caller can pass the address
2906 * of a variable that will be filled in with the version of the
2907 * header object at the time it was read.
2909 * Returns a pointer-coded errno if a failure occurs.
2911 static struct rbd_image_header_ondisk *
2912 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
2914 struct rbd_image_header_ondisk *ondisk = NULL;
2921 * The complete header will include an array of its 64-bit
2922 * snapshot ids, followed by the names of those snapshots as
2923 * a contiguous block of NUL-terminated strings. Note that
2924 * the number of snapshots could change by the time we read
2925 * it in, in which case we re-read it.
2932 size = sizeof (*ondisk);
2933 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2935 ondisk = kmalloc(size, GFP_KERNEL);
2937 return ERR_PTR(-ENOMEM);
2939 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2943 if ((size_t)ret < size) {
2945 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2949 if (!rbd_dev_ondisk_valid(ondisk)) {
2951 rbd_warn(rbd_dev, "invalid header");
2955 names_size = le64_to_cpu(ondisk->snap_names_len);
2956 want_count = snap_count;
2957 snap_count = le32_to_cpu(ondisk->snap_count);
2958 } while (snap_count != want_count);
2965 return ERR_PTR(ret);
2969 * reload the ondisk the header
2971 static int rbd_read_header(struct rbd_device *rbd_dev,
2972 struct rbd_image_header *header)
2974 struct rbd_image_header_ondisk *ondisk;
2977 ondisk = rbd_dev_v1_header_read(rbd_dev);
2979 return PTR_ERR(ondisk);
2980 ret = rbd_header_from_disk(header, ondisk);
2986 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2988 struct rbd_snap *snap;
2989 struct rbd_snap *next;
2991 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
2992 list_del(&snap->node);
2993 rbd_snap_destroy(snap);
2997 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2999 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3002 if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3005 rbd_dev->mapping.size = rbd_dev->header.image_size;
3006 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3007 dout("setting size to %llu sectors", (unsigned long long)size);
3008 set_capacity(rbd_dev->disk, size);
3013 * only read the first part of the ondisk header, without the snaps info
3015 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3018 struct rbd_image_header h;
3020 ret = rbd_read_header(rbd_dev, &h);
3024 down_write(&rbd_dev->header_rwsem);
3026 /* Update image size, and check for resize of mapped image */
3027 rbd_dev->header.image_size = h.image_size;
3028 rbd_update_mapping_size(rbd_dev);
3030 /* rbd_dev->header.object_prefix shouldn't change */
3031 kfree(rbd_dev->header.snap_sizes);
3032 kfree(rbd_dev->header.snap_names);
3033 /* osd requests may still refer to snapc */
3034 ceph_put_snap_context(rbd_dev->header.snapc);
3036 rbd_dev->header.image_size = h.image_size;
3037 rbd_dev->header.snapc = h.snapc;
3038 rbd_dev->header.snap_names = h.snap_names;
3039 rbd_dev->header.snap_sizes = h.snap_sizes;
3040 /* Free the extra copy of the object prefix */
3041 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3042 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3043 kfree(h.object_prefix);
3045 ret = rbd_dev_snaps_update(rbd_dev);
3047 up_write(&rbd_dev->header_rwsem);
3052 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3057 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3058 image_size = rbd_dev->header.image_size;
3059 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3060 if (rbd_dev->image_format == 1)
3061 ret = rbd_dev_v1_refresh(rbd_dev);
3063 ret = rbd_dev_v2_refresh(rbd_dev);
3064 mutex_unlock(&ctl_mutex);
3066 rbd_warn(rbd_dev, "got notification but failed to "
3067 " update snaps: %d\n", ret);
3068 if (image_size != rbd_dev->header.image_size)
3069 revalidate_disk(rbd_dev->disk);
3074 static int rbd_init_disk(struct rbd_device *rbd_dev)
3076 struct gendisk *disk;
3077 struct request_queue *q;
3080 /* create gendisk info */
3081 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3085 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3087 disk->major = rbd_dev->major;
3088 disk->first_minor = 0;
3089 disk->fops = &rbd_bd_ops;
3090 disk->private_data = rbd_dev;
3092 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3096 /* We use the default size, but let's be explicit about it. */
3097 blk_queue_physical_block_size(q, SECTOR_SIZE);
3099 /* set io sizes to object size */
3100 segment_size = rbd_obj_bytes(&rbd_dev->header);
3101 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3102 blk_queue_max_segment_size(q, segment_size);
3103 blk_queue_io_min(q, segment_size);
3104 blk_queue_io_opt(q, segment_size);
3106 blk_queue_merge_bvec(q, rbd_merge_bvec);
3109 q->queuedata = rbd_dev;
3111 rbd_dev->disk = disk;
3124 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3126 return container_of(dev, struct rbd_device, dev);
3129 static ssize_t rbd_size_show(struct device *dev,
3130 struct device_attribute *attr, char *buf)
3132 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3134 return sprintf(buf, "%llu\n",
3135 (unsigned long long)rbd_dev->mapping.size);
3139 * Note this shows the features for whatever's mapped, which is not
3140 * necessarily the base image.
3142 static ssize_t rbd_features_show(struct device *dev,
3143 struct device_attribute *attr, char *buf)
3145 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3147 return sprintf(buf, "0x%016llx\n",
3148 (unsigned long long)rbd_dev->mapping.features);
3151 static ssize_t rbd_major_show(struct device *dev,
3152 struct device_attribute *attr, char *buf)
3154 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3157 return sprintf(buf, "%d\n", rbd_dev->major);
3159 return sprintf(buf, "(none)\n");
3163 static ssize_t rbd_client_id_show(struct device *dev,
3164 struct device_attribute *attr, char *buf)
3166 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3168 return sprintf(buf, "client%lld\n",
3169 ceph_client_id(rbd_dev->rbd_client->client));
3172 static ssize_t rbd_pool_show(struct device *dev,
3173 struct device_attribute *attr, char *buf)
3175 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3177 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3180 static ssize_t rbd_pool_id_show(struct device *dev,
3181 struct device_attribute *attr, char *buf)
3183 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3185 return sprintf(buf, "%llu\n",
3186 (unsigned long long) rbd_dev->spec->pool_id);
3189 static ssize_t rbd_name_show(struct device *dev,
3190 struct device_attribute *attr, char *buf)
3192 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3194 if (rbd_dev->spec->image_name)
3195 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3197 return sprintf(buf, "(unknown)\n");
3200 static ssize_t rbd_image_id_show(struct device *dev,
3201 struct device_attribute *attr, char *buf)
3203 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3205 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3209 * Shows the name of the currently-mapped snapshot (or
3210 * RBD_SNAP_HEAD_NAME for the base image).
3212 static ssize_t rbd_snap_show(struct device *dev,
3213 struct device_attribute *attr,
3216 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3218 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3222 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3223 * for the parent image. If there is no parent, simply shows
3224 * "(no parent image)".
3226 static ssize_t rbd_parent_show(struct device *dev,
3227 struct device_attribute *attr,
3230 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3231 struct rbd_spec *spec = rbd_dev->parent_spec;
3236 return sprintf(buf, "(no parent image)\n");
3238 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3239 (unsigned long long) spec->pool_id, spec->pool_name);
3244 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3245 spec->image_name ? spec->image_name : "(unknown)");
3250 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3251 (unsigned long long) spec->snap_id, spec->snap_name);
3256 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3261 return (ssize_t) (bufp - buf);
3264 static ssize_t rbd_image_refresh(struct device *dev,
3265 struct device_attribute *attr,
3269 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3272 ret = rbd_dev_refresh(rbd_dev);
3274 return ret < 0 ? ret : size;
3277 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3278 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3279 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3280 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3281 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3282 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3283 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3284 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3285 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3286 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3287 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3289 static struct attribute *rbd_attrs[] = {
3290 &dev_attr_size.attr,
3291 &dev_attr_features.attr,
3292 &dev_attr_major.attr,
3293 &dev_attr_client_id.attr,
3294 &dev_attr_pool.attr,
3295 &dev_attr_pool_id.attr,
3296 &dev_attr_name.attr,
3297 &dev_attr_image_id.attr,
3298 &dev_attr_current_snap.attr,
3299 &dev_attr_parent.attr,
3300 &dev_attr_refresh.attr,
3304 static struct attribute_group rbd_attr_group = {
3308 static const struct attribute_group *rbd_attr_groups[] = {
3313 static void rbd_sysfs_dev_release(struct device *dev)
3317 static struct device_type rbd_device_type = {
3319 .groups = rbd_attr_groups,
3320 .release = rbd_sysfs_dev_release,
3323 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3325 kref_get(&spec->kref);
3330 static void rbd_spec_free(struct kref *kref);
3331 static void rbd_spec_put(struct rbd_spec *spec)
3334 kref_put(&spec->kref, rbd_spec_free);
3337 static struct rbd_spec *rbd_spec_alloc(void)
3339 struct rbd_spec *spec;
3341 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3344 kref_init(&spec->kref);
3349 static void rbd_spec_free(struct kref *kref)
3351 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3353 kfree(spec->pool_name);
3354 kfree(spec->image_id);
3355 kfree(spec->image_name);
3356 kfree(spec->snap_name);
3360 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3361 struct rbd_spec *spec)
3363 struct rbd_device *rbd_dev;
3365 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3369 spin_lock_init(&rbd_dev->lock);
3371 INIT_LIST_HEAD(&rbd_dev->node);
3372 INIT_LIST_HEAD(&rbd_dev->snaps);
3373 init_rwsem(&rbd_dev->header_rwsem);
3375 rbd_dev->spec = spec;
3376 rbd_dev->rbd_client = rbdc;
3378 /* Initialize the layout used for all rbd requests */
3380 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3381 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3382 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3383 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3388 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3390 rbd_put_client(rbd_dev->rbd_client);
3391 rbd_spec_put(rbd_dev->spec);
3395 static void rbd_snap_destroy(struct rbd_snap *snap)
3401 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3402 const char *snap_name,
3403 u64 snap_id, u64 snap_size,
3406 struct rbd_snap *snap;
3408 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3410 return ERR_PTR(-ENOMEM);
3412 snap->name = snap_name;
3414 snap->size = snap_size;
3415 snap->features = snap_features;
3421 * Returns a dynamically-allocated snapshot name if successful, or a
3422 * pointer-coded error otherwise.
3424 static const char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3425 u64 *snap_size, u64 *snap_features)
3427 const char *snap_name;
3430 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3432 /* Skip over names until we find the one we are looking for */
3434 snap_name = rbd_dev->header.snap_names;
3435 for (i = 0; i < which; i++)
3436 snap_name += strlen(snap_name) + 1;
3438 snap_name = kstrdup(snap_name, GFP_KERNEL);
3440 return ERR_PTR(-ENOMEM);
3442 *snap_size = rbd_dev->header.snap_sizes[which];
3443 *snap_features = 0; /* No features for v1 */
3449 * Get the size and object order for an image snapshot, or if
3450 * snap_id is CEPH_NOSNAP, gets this information for the base
3453 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3454 u8 *order, u64 *snap_size)
3456 __le64 snapid = cpu_to_le64(snap_id);
3461 } __attribute__ ((packed)) size_buf = { 0 };
3463 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3465 &snapid, sizeof (snapid),
3466 &size_buf, sizeof (size_buf), NULL);
3467 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3470 if (ret < sizeof (size_buf))
3474 *order = size_buf.order;
3475 *snap_size = le64_to_cpu(size_buf.size);
3477 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
3478 (unsigned long long)snap_id, (unsigned int)*order,
3479 (unsigned long long)*snap_size);
3484 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3486 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3487 &rbd_dev->header.obj_order,
3488 &rbd_dev->header.image_size);
3491 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3497 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3501 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3502 "rbd", "get_object_prefix", NULL, 0,
3503 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3504 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3509 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3510 p + ret, NULL, GFP_NOIO);
3513 if (IS_ERR(rbd_dev->header.object_prefix)) {
3514 ret = PTR_ERR(rbd_dev->header.object_prefix);
3515 rbd_dev->header.object_prefix = NULL;
3517 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3525 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3528 __le64 snapid = cpu_to_le64(snap_id);
3532 } __attribute__ ((packed)) features_buf = { 0 };
3536 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3537 "rbd", "get_features",
3538 &snapid, sizeof (snapid),
3539 &features_buf, sizeof (features_buf), NULL);
3540 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3543 if (ret < sizeof (features_buf))
3546 incompat = le64_to_cpu(features_buf.incompat);
3547 if (incompat & ~RBD_FEATURES_SUPPORTED)
3550 *snap_features = le64_to_cpu(features_buf.features);
3552 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3553 (unsigned long long)snap_id,
3554 (unsigned long long)*snap_features,
3555 (unsigned long long)le64_to_cpu(features_buf.incompat));
3560 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3562 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3563 &rbd_dev->header.features);
3566 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3568 struct rbd_spec *parent_spec;
3570 void *reply_buf = NULL;
3578 parent_spec = rbd_spec_alloc();
3582 size = sizeof (__le64) + /* pool_id */
3583 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3584 sizeof (__le64) + /* snap_id */
3585 sizeof (__le64); /* overlap */
3586 reply_buf = kmalloc(size, GFP_KERNEL);
3592 snapid = cpu_to_le64(CEPH_NOSNAP);
3593 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3594 "rbd", "get_parent",
3595 &snapid, sizeof (snapid),
3596 reply_buf, size, NULL);
3597 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3602 end = reply_buf + ret;
3604 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3605 if (parent_spec->pool_id == CEPH_NOPOOL)
3606 goto out; /* No parent? No problem. */
3608 /* The ceph file layout needs to fit pool id in 32 bits */
3611 if (parent_spec->pool_id > (u64)U32_MAX) {
3612 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3613 (unsigned long long)parent_spec->pool_id, U32_MAX);
3617 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3618 if (IS_ERR(image_id)) {
3619 ret = PTR_ERR(image_id);
3622 parent_spec->image_id = image_id;
3623 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3624 ceph_decode_64_safe(&p, end, overlap, out_err);
3626 rbd_dev->parent_overlap = overlap;
3627 rbd_dev->parent_spec = parent_spec;
3628 parent_spec = NULL; /* rbd_dev now owns this */
3633 rbd_spec_put(parent_spec);
3638 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3642 __le64 stripe_count;
3643 } __attribute__ ((packed)) striping_info_buf = { 0 };
3644 size_t size = sizeof (striping_info_buf);
3651 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3652 "rbd", "get_stripe_unit_count", NULL, 0,
3653 (char *)&striping_info_buf, size, NULL);
3654 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3661 * We don't actually support the "fancy striping" feature
3662 * (STRIPINGV2) yet, but if the striping sizes are the
3663 * defaults the behavior is the same as before. So find
3664 * out, and only fail if the image has non-default values.
3667 obj_size = (u64)1 << rbd_dev->header.obj_order;
3668 p = &striping_info_buf;
3669 stripe_unit = ceph_decode_64(&p);
3670 if (stripe_unit != obj_size) {
3671 rbd_warn(rbd_dev, "unsupported stripe unit "
3672 "(got %llu want %llu)",
3673 stripe_unit, obj_size);
3676 stripe_count = ceph_decode_64(&p);
3677 if (stripe_count != 1) {
3678 rbd_warn(rbd_dev, "unsupported stripe count "
3679 "(got %llu want 1)", stripe_count);
3682 rbd_dev->header.stripe_unit = stripe_unit;
3683 rbd_dev->header.stripe_count = stripe_count;
3688 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3690 size_t image_id_size;
3695 void *reply_buf = NULL;
3697 char *image_name = NULL;
3700 rbd_assert(!rbd_dev->spec->image_name);
3702 len = strlen(rbd_dev->spec->image_id);
3703 image_id_size = sizeof (__le32) + len;
3704 image_id = kmalloc(image_id_size, GFP_KERNEL);
3709 end = image_id + image_id_size;
3710 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3712 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3713 reply_buf = kmalloc(size, GFP_KERNEL);
3717 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3718 "rbd", "dir_get_name",
3719 image_id, image_id_size,
3720 reply_buf, size, NULL);
3724 end = reply_buf + ret;
3726 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3727 if (IS_ERR(image_name))
3730 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3739 * When an rbd image has a parent image, it is identified by the
3740 * pool, image, and snapshot ids (not names). This function fills
3741 * in the names for those ids. (It's OK if we can't figure out the
3742 * name for an image id, but the pool and snapshot ids should always
3743 * exist and have names.) All names in an rbd spec are dynamically
3746 * When an image being mapped (not a parent) is probed, we have the
3747 * pool name and pool id, image name and image id, and the snapshot
3748 * name. The only thing we're missing is the snapshot id.
3750 * The set of snapshots for an image is not known until they have
3751 * been read by rbd_dev_snaps_update(), so we can't completely fill
3752 * in this information until after that has been called.
3754 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3756 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3757 struct rbd_spec *spec = rbd_dev->spec;
3758 const char *pool_name;
3759 const char *image_name;
3760 const char *snap_name;
3764 * An image being mapped will have the pool name (etc.), but
3765 * we need to look up the snapshot id.
3767 if (spec->pool_name) {
3768 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3769 struct rbd_snap *snap;
3771 snap = snap_by_name(rbd_dev, spec->snap_name);
3774 spec->snap_id = snap->id;
3776 spec->snap_id = CEPH_NOSNAP;
3782 /* Get the pool name; we have to make our own copy of this */
3784 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3786 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3789 pool_name = kstrdup(pool_name, GFP_KERNEL);
3793 /* Fetch the image name; tolerate failure here */
3795 image_name = rbd_dev_image_name(rbd_dev);
3797 rbd_warn(rbd_dev, "unable to get image name");
3799 /* Look up the snapshot name, and make a copy */
3801 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3803 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3807 snap_name = kstrdup(snap_name, GFP_KERNEL);
3813 spec->pool_name = pool_name;
3814 spec->image_name = image_name;
3815 spec->snap_name = snap_name;
3825 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3834 struct ceph_snap_context *snapc;
3838 * We'll need room for the seq value (maximum snapshot id),
3839 * snapshot count, and array of that many snapshot ids.
3840 * For now we have a fixed upper limit on the number we're
3841 * prepared to receive.
3843 size = sizeof (__le64) + sizeof (__le32) +
3844 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3845 reply_buf = kzalloc(size, GFP_KERNEL);
3849 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3850 "rbd", "get_snapcontext", NULL, 0,
3851 reply_buf, size, NULL);
3852 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3857 end = reply_buf + ret;
3859 ceph_decode_64_safe(&p, end, seq, out);
3860 ceph_decode_32_safe(&p, end, snap_count, out);
3863 * Make sure the reported number of snapshot ids wouldn't go
3864 * beyond the end of our buffer. But before checking that,
3865 * make sure the computed size of the snapshot context we
3866 * allocate is representable in a size_t.
3868 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3873 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3877 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3883 for (i = 0; i < snap_count; i++)
3884 snapc->snaps[i] = ceph_decode_64(&p);
3886 rbd_dev->header.snapc = snapc;
3888 dout(" snap context seq = %llu, snap_count = %u\n",
3889 (unsigned long long)seq, (unsigned int)snap_count);
3896 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3906 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3907 reply_buf = kmalloc(size, GFP_KERNEL);
3909 return ERR_PTR(-ENOMEM);
3911 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3912 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3913 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3914 "rbd", "get_snapshot_name",
3915 &snap_id, sizeof (snap_id),
3916 reply_buf, size, NULL);
3917 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3919 snap_name = ERR_PTR(ret);
3924 end = reply_buf + ret;
3925 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3926 if (IS_ERR(snap_name))
3929 dout(" snap_id 0x%016llx snap_name = %s\n",
3930 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3937 static const char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3938 u64 *snap_size, u64 *snap_features)
3943 const char *snap_name;
3946 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3947 snap_id = rbd_dev->header.snapc->snaps[which];
3948 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3952 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3956 snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3957 if (!IS_ERR(snap_name)) {
3959 *snap_features = features;
3964 return ERR_PTR(ret);
3967 static const char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3968 u64 *snap_size, u64 *snap_features)
3970 if (rbd_dev->image_format == 1)
3971 return rbd_dev_v1_snap_info(rbd_dev, which,
3972 snap_size, snap_features);
3973 if (rbd_dev->image_format == 2)
3974 return rbd_dev_v2_snap_info(rbd_dev, which,
3975 snap_size, snap_features);
3976 return ERR_PTR(-EINVAL);
3979 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
3983 down_write(&rbd_dev->header_rwsem);
3985 ret = rbd_dev_v2_image_size(rbd_dev);
3988 rbd_update_mapping_size(rbd_dev);
3990 ret = rbd_dev_v2_snap_context(rbd_dev);
3991 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3994 ret = rbd_dev_snaps_update(rbd_dev);
3995 dout("rbd_dev_snaps_update returned %d\n", ret);
3999 up_write(&rbd_dev->header_rwsem);
4005 * Scan the rbd device's current snapshot list and compare it to the
4006 * newly-received snapshot context. Remove any existing snapshots
4007 * not present in the new snapshot context. Add a new snapshot for
4008 * any snaphots in the snapshot context not in the current list.
4009 * And verify there are no changes to snapshots we already know
4012 * Assumes the snapshots in the snapshot context are sorted by
4013 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
4014 * are also maintained in that order.)
4016 * Note that any error occurs while updating the snapshot list
4017 * aborts the update, and the entire list is cleared. The snapshot
4018 * list becomes inconsistent at that point anyway, so it might as
4021 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4023 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4024 const u32 snap_count = snapc->num_snaps;
4025 struct list_head *head = &rbd_dev->snaps;
4026 struct list_head *links = head->next;
4030 dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4031 while (index < snap_count || links != head) {
4033 struct rbd_snap *snap;
4034 const char *snap_name;
4036 u64 snap_features = 0;
4038 snap_id = index < snap_count ? snapc->snaps[index]
4040 snap = links != head ? list_entry(links, struct rbd_snap, node)
4042 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4044 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4045 struct list_head *next = links->next;
4048 * A previously-existing snapshot is not in
4049 * the new snap context.
4051 * If the now-missing snapshot is the one
4052 * the image represents, clear its existence
4053 * flag so we can avoid sending any more
4056 if (rbd_dev->spec->snap_id == snap->id)
4057 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4058 dout("removing %ssnap id %llu\n",
4059 rbd_dev->spec->snap_id == snap->id ?
4061 (unsigned long long)snap->id);
4063 list_del(&snap->node);
4064 rbd_snap_destroy(snap);
4066 /* Done with this list entry; advance */
4072 snap_name = rbd_dev_snap_info(rbd_dev, index,
4073 &snap_size, &snap_features);
4074 if (IS_ERR(snap_name)) {
4075 ret = PTR_ERR(snap_name);
4076 dout("failed to get snap info, error %d\n", ret);
4080 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4081 (unsigned long long)snap_id);
4082 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4083 struct rbd_snap *new_snap;
4085 /* We haven't seen this snapshot before */
4087 new_snap = rbd_snap_create(rbd_dev, snap_name,
4088 snap_id, snap_size, snap_features);
4089 if (IS_ERR(new_snap)) {
4090 ret = PTR_ERR(new_snap);
4091 dout(" failed to add dev, error %d\n", ret);
4095 /* New goes before existing, or at end of list */
4097 dout(" added dev%s\n", snap ? "" : " at end\n");
4099 list_add_tail(&new_snap->node, &snap->node);
4101 list_add_tail(&new_snap->node, head);
4103 /* Already have this one */
4105 dout(" already present\n");
4107 rbd_assert(snap->size == snap_size);
4108 rbd_assert(!strcmp(snap->name, snap_name));
4109 rbd_assert(snap->features == snap_features);
4111 /* Done with this list entry; advance */
4113 links = links->next;
4116 /* Advance to the next entry in the snapshot context */
4120 dout("%s: done\n", __func__);
4124 rbd_remove_all_snaps(rbd_dev);
4129 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4134 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4136 dev = &rbd_dev->dev;
4137 dev->bus = &rbd_bus_type;
4138 dev->type = &rbd_device_type;
4139 dev->parent = &rbd_root_dev;
4140 dev->release = rbd_dev_device_release;
4141 dev_set_name(dev, "%d", rbd_dev->dev_id);
4142 ret = device_register(dev);
4144 mutex_unlock(&ctl_mutex);
4149 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4151 device_unregister(&rbd_dev->dev);
4154 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4157 * Get a unique rbd identifier for the given new rbd_dev, and add
4158 * the rbd_dev to the global list. The minimum rbd id is 1.
4160 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4162 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4164 spin_lock(&rbd_dev_list_lock);
4165 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4166 spin_unlock(&rbd_dev_list_lock);
4167 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4168 (unsigned long long) rbd_dev->dev_id);
4172 * Remove an rbd_dev from the global list, and record that its
4173 * identifier is no longer in use.
4175 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4177 struct list_head *tmp;
4178 int rbd_id = rbd_dev->dev_id;
4181 rbd_assert(rbd_id > 0);
4183 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4184 (unsigned long long) rbd_dev->dev_id);
4185 spin_lock(&rbd_dev_list_lock);
4186 list_del_init(&rbd_dev->node);
4189 * If the id being "put" is not the current maximum, there
4190 * is nothing special we need to do.
4192 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4193 spin_unlock(&rbd_dev_list_lock);
4198 * We need to update the current maximum id. Search the
4199 * list to find out what it is. We're more likely to find
4200 * the maximum at the end, so search the list backward.
4203 list_for_each_prev(tmp, &rbd_dev_list) {
4204 struct rbd_device *rbd_dev;
4206 rbd_dev = list_entry(tmp, struct rbd_device, node);
4207 if (rbd_dev->dev_id > max_id)
4208 max_id = rbd_dev->dev_id;
4210 spin_unlock(&rbd_dev_list_lock);
4213 * The max id could have been updated by rbd_dev_id_get(), in
4214 * which case it now accurately reflects the new maximum.
4215 * Be careful not to overwrite the maximum value in that
4218 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4219 dout(" max dev id has been reset\n");
4223 * Skips over white space at *buf, and updates *buf to point to the
4224 * first found non-space character (if any). Returns the length of
4225 * the token (string of non-white space characters) found. Note
4226 * that *buf must be terminated with '\0'.
4228 static inline size_t next_token(const char **buf)
4231 * These are the characters that produce nonzero for
4232 * isspace() in the "C" and "POSIX" locales.
4234 const char *spaces = " \f\n\r\t\v";
4236 *buf += strspn(*buf, spaces); /* Find start of token */
4238 return strcspn(*buf, spaces); /* Return token length */
4242 * Finds the next token in *buf, and if the provided token buffer is
4243 * big enough, copies the found token into it. The result, if
4244 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4245 * must be terminated with '\0' on entry.
4247 * Returns the length of the token found (not including the '\0').
4248 * Return value will be 0 if no token is found, and it will be >=
4249 * token_size if the token would not fit.
4251 * The *buf pointer will be updated to point beyond the end of the
4252 * found token. Note that this occurs even if the token buffer is
4253 * too small to hold it.
4255 static inline size_t copy_token(const char **buf,
4261 len = next_token(buf);
4262 if (len < token_size) {
4263 memcpy(token, *buf, len);
4264 *(token + len) = '\0';
4272 * Finds the next token in *buf, dynamically allocates a buffer big
4273 * enough to hold a copy of it, and copies the token into the new
4274 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4275 * that a duplicate buffer is created even for a zero-length token.
4277 * Returns a pointer to the newly-allocated duplicate, or a null
4278 * pointer if memory for the duplicate was not available. If
4279 * the lenp argument is a non-null pointer, the length of the token
4280 * (not including the '\0') is returned in *lenp.
4282 * If successful, the *buf pointer will be updated to point beyond
4283 * the end of the found token.
4285 * Note: uses GFP_KERNEL for allocation.
4287 static inline char *dup_token(const char **buf, size_t *lenp)
4292 len = next_token(buf);
4293 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4296 *(dup + len) = '\0';
4306 * Parse the options provided for an "rbd add" (i.e., rbd image
4307 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4308 * and the data written is passed here via a NUL-terminated buffer.
4309 * Returns 0 if successful or an error code otherwise.
4311 * The information extracted from these options is recorded in
4312 * the other parameters which return dynamically-allocated
4315 * The address of a pointer that will refer to a ceph options
4316 * structure. Caller must release the returned pointer using
4317 * ceph_destroy_options() when it is no longer needed.
4319 * Address of an rbd options pointer. Fully initialized by
4320 * this function; caller must release with kfree().
4322 * Address of an rbd image specification pointer. Fully
4323 * initialized by this function based on parsed options.
4324 * Caller must release with rbd_spec_put().
4326 * The options passed take this form:
4327 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4330 * A comma-separated list of one or more monitor addresses.
4331 * A monitor address is an ip address, optionally followed
4332 * by a port number (separated by a colon).
4333 * I.e.: ip1[:port1][,ip2[:port2]...]
4335 * A comma-separated list of ceph and/or rbd options.
4337 * The name of the rados pool containing the rbd image.
4339 * The name of the image in that pool to map.
4341 * An optional snapshot id. If provided, the mapping will
4342 * present data from the image at the time that snapshot was
4343 * created. The image head is used if no snapshot id is
4344 * provided. Snapshot mappings are always read-only.
4346 static int rbd_add_parse_args(const char *buf,
4347 struct ceph_options **ceph_opts,
4348 struct rbd_options **opts,
4349 struct rbd_spec **rbd_spec)
4353 const char *mon_addrs;
4355 size_t mon_addrs_size;
4356 struct rbd_spec *spec = NULL;
4357 struct rbd_options *rbd_opts = NULL;
4358 struct ceph_options *copts;
4361 /* The first four tokens are required */
4363 len = next_token(&buf);
4365 rbd_warn(NULL, "no monitor address(es) provided");
4369 mon_addrs_size = len + 1;
4373 options = dup_token(&buf, NULL);
4377 rbd_warn(NULL, "no options provided");
4381 spec = rbd_spec_alloc();
4385 spec->pool_name = dup_token(&buf, NULL);
4386 if (!spec->pool_name)
4388 if (!*spec->pool_name) {
4389 rbd_warn(NULL, "no pool name provided");
4393 spec->image_name = dup_token(&buf, NULL);
4394 if (!spec->image_name)
4396 if (!*spec->image_name) {
4397 rbd_warn(NULL, "no image name provided");
4402 * Snapshot name is optional; default is to use "-"
4403 * (indicating the head/no snapshot).
4405 len = next_token(&buf);
4407 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4408 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4409 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4410 ret = -ENAMETOOLONG;
4413 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4416 *(snap_name + len) = '\0';
4417 spec->snap_name = snap_name;
4419 /* Initialize all rbd options to the defaults */
4421 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4425 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4427 copts = ceph_parse_options(options, mon_addrs,
4428 mon_addrs + mon_addrs_size - 1,
4429 parse_rbd_opts_token, rbd_opts);
4430 if (IS_ERR(copts)) {
4431 ret = PTR_ERR(copts);
4452 * An rbd format 2 image has a unique identifier, distinct from the
4453 * name given to it by the user. Internally, that identifier is
4454 * what's used to specify the names of objects related to the image.
4456 * A special "rbd id" object is used to map an rbd image name to its
4457 * id. If that object doesn't exist, then there is no v2 rbd image
4458 * with the supplied name.
4460 * This function will record the given rbd_dev's image_id field if
4461 * it can be determined, and in that case will return 0. If any
4462 * errors occur a negative errno will be returned and the rbd_dev's
4463 * image_id field will be unchanged (and should be NULL).
4465 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4474 * When probing a parent image, the image id is already
4475 * known (and the image name likely is not). There's no
4476 * need to fetch the image id again in this case. We
4477 * do still need to set the image format though.
4479 if (rbd_dev->spec->image_id) {
4480 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4486 * First, see if the format 2 image id file exists, and if
4487 * so, get the image's persistent id from it.
4489 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4490 object_name = kmalloc(size, GFP_NOIO);
4493 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4494 dout("rbd id object name is %s\n", object_name);
4496 /* Response will be an encoded string, which includes a length */
4498 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4499 response = kzalloc(size, GFP_NOIO);
4505 /* If it doesn't exist we'll assume it's a format 1 image */
4507 ret = rbd_obj_method_sync(rbd_dev, object_name,
4508 "rbd", "get_id", NULL, 0,
4509 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4510 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4511 if (ret == -ENOENT) {
4512 image_id = kstrdup("", GFP_KERNEL);
4513 ret = image_id ? 0 : -ENOMEM;
4515 rbd_dev->image_format = 1;
4516 } else if (ret > sizeof (__le32)) {
4519 image_id = ceph_extract_encoded_string(&p, p + ret,
4521 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4523 rbd_dev->image_format = 2;
4529 rbd_dev->spec->image_id = image_id;
4530 dout("image_id is %s\n", image_id);
4539 /* Undo whatever state changes are made by v1 or v2 image probe */
4541 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4543 struct rbd_image_header *header;
4545 rbd_dev_remove_parent(rbd_dev);
4546 rbd_spec_put(rbd_dev->parent_spec);
4547 rbd_dev->parent_spec = NULL;
4548 rbd_dev->parent_overlap = 0;
4550 /* Free dynamic fields from the header, then zero it out */
4552 header = &rbd_dev->header;
4553 ceph_put_snap_context(header->snapc);
4554 kfree(header->snap_sizes);
4555 kfree(header->snap_names);
4556 kfree(header->object_prefix);
4557 memset(header, 0, sizeof (*header));
4560 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4564 /* Populate rbd image metadata */
4566 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4570 /* Version 1 images have no parent (no layering) */
4572 rbd_dev->parent_spec = NULL;
4573 rbd_dev->parent_overlap = 0;
4575 dout("discovered version 1 image, header name is %s\n",
4576 rbd_dev->header_name);
4581 kfree(rbd_dev->header_name);
4582 rbd_dev->header_name = NULL;
4583 kfree(rbd_dev->spec->image_id);
4584 rbd_dev->spec->image_id = NULL;
4589 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4593 ret = rbd_dev_v2_image_size(rbd_dev);
4597 /* Get the object prefix (a.k.a. block_name) for the image */
4599 ret = rbd_dev_v2_object_prefix(rbd_dev);
4603 /* Get the and check features for the image */
4605 ret = rbd_dev_v2_features(rbd_dev);
4609 /* If the image supports layering, get the parent info */
4611 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4612 ret = rbd_dev_v2_parent_info(rbd_dev);
4617 * Don't print a warning for parent images. We can
4618 * tell this point because we won't know its pool
4619 * name yet (just its pool id).
4621 if (rbd_dev->spec->pool_name)
4622 rbd_warn(rbd_dev, "WARNING: kernel layering "
4623 "is EXPERIMENTAL!");
4626 /* If the image supports fancy striping, get its parameters */
4628 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4629 ret = rbd_dev_v2_striping_info(rbd_dev);
4634 /* crypto and compression type aren't (yet) supported for v2 images */
4636 rbd_dev->header.crypt_type = 0;
4637 rbd_dev->header.comp_type = 0;
4639 /* Get the snapshot context, plus the header version */
4641 ret = rbd_dev_v2_snap_context(rbd_dev);
4645 dout("discovered version 2 image, header name is %s\n",
4646 rbd_dev->header_name);
4650 rbd_dev->parent_overlap = 0;
4651 rbd_spec_put(rbd_dev->parent_spec);
4652 rbd_dev->parent_spec = NULL;
4653 kfree(rbd_dev->header_name);
4654 rbd_dev->header_name = NULL;
4655 kfree(rbd_dev->header.object_prefix);
4656 rbd_dev->header.object_prefix = NULL;
4661 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4663 struct rbd_device *parent = NULL;
4664 struct rbd_spec *parent_spec;
4665 struct rbd_client *rbdc;
4668 if (!rbd_dev->parent_spec)
4671 * We need to pass a reference to the client and the parent
4672 * spec when creating the parent rbd_dev. Images related by
4673 * parent/child relationships always share both.
4675 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4676 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4679 parent = rbd_dev_create(rbdc, parent_spec);
4683 ret = rbd_dev_image_probe(parent);
4686 rbd_dev->parent = parent;
4691 rbd_spec_put(rbd_dev->parent_spec);
4692 kfree(rbd_dev->header_name);
4693 rbd_dev_destroy(parent);
4695 rbd_put_client(rbdc);
4696 rbd_spec_put(parent_spec);
4702 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4706 ret = rbd_dev_mapping_set(rbd_dev);
4710 /* generate unique id: find highest unique id, add one */
4711 rbd_dev_id_get(rbd_dev);
4713 /* Fill in the device name, now that we have its id. */
4714 BUILD_BUG_ON(DEV_NAME_LEN
4715 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4716 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4718 /* Get our block major device number. */
4720 ret = register_blkdev(0, rbd_dev->name);
4723 rbd_dev->major = ret;
4725 /* Set up the blkdev mapping. */
4727 ret = rbd_init_disk(rbd_dev);
4729 goto err_out_blkdev;
4731 ret = rbd_bus_add_dev(rbd_dev);
4735 /* Everything's ready. Announce the disk to the world. */
4737 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4738 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4739 add_disk(rbd_dev->disk);
4741 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4742 (unsigned long long) rbd_dev->mapping.size);
4747 rbd_free_disk(rbd_dev);
4749 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4751 rbd_dev_id_put(rbd_dev);
4752 rbd_dev_mapping_clear(rbd_dev);
4757 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4759 struct rbd_spec *spec = rbd_dev->spec;
4762 /* Record the header object name for this rbd image. */
4764 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4766 if (rbd_dev->image_format == 1)
4767 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4769 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4771 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4772 if (!rbd_dev->header_name)
4775 if (rbd_dev->image_format == 1)
4776 sprintf(rbd_dev->header_name, "%s%s",
4777 spec->image_name, RBD_SUFFIX);
4779 sprintf(rbd_dev->header_name, "%s%s",
4780 RBD_HEADER_PREFIX, spec->image_id);
4784 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4788 rbd_remove_all_snaps(rbd_dev);
4789 rbd_dev_unprobe(rbd_dev);
4790 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4792 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4793 kfree(rbd_dev->header_name);
4794 rbd_dev->header_name = NULL;
4795 rbd_dev->image_format = 0;
4796 kfree(rbd_dev->spec->image_id);
4797 rbd_dev->spec->image_id = NULL;
4799 rbd_dev_destroy(rbd_dev);
4803 * Probe for the existence of the header object for the given rbd
4804 * device. For format 2 images this includes determining the image
4807 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4813 * Get the id from the image id object. If it's not a
4814 * format 2 image, we'll get ENOENT back, and we'll assume
4815 * it's a format 1 image.
4817 ret = rbd_dev_image_id(rbd_dev);
4820 rbd_assert(rbd_dev->spec->image_id);
4821 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4823 ret = rbd_dev_header_name(rbd_dev);
4825 goto err_out_format;
4827 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4829 goto out_header_name;
4831 if (rbd_dev->image_format == 1)
4832 ret = rbd_dev_v1_probe(rbd_dev);
4834 ret = rbd_dev_v2_probe(rbd_dev);
4838 ret = rbd_dev_snaps_update(rbd_dev);
4842 ret = rbd_dev_spec_update(rbd_dev);
4846 ret = rbd_dev_probe_parent(rbd_dev);
4851 rbd_remove_all_snaps(rbd_dev);
4853 rbd_dev_unprobe(rbd_dev);
4855 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4857 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4859 kfree(rbd_dev->header_name);
4860 rbd_dev->header_name = NULL;
4862 rbd_dev->image_format = 0;
4863 kfree(rbd_dev->spec->image_id);
4864 rbd_dev->spec->image_id = NULL;
4866 dout("probe failed, returning %d\n", ret);
4871 static ssize_t rbd_add(struct bus_type *bus,
4875 struct rbd_device *rbd_dev = NULL;
4876 struct ceph_options *ceph_opts = NULL;
4877 struct rbd_options *rbd_opts = NULL;
4878 struct rbd_spec *spec = NULL;
4879 struct rbd_client *rbdc;
4880 struct ceph_osd_client *osdc;
4883 if (!try_module_get(THIS_MODULE))
4886 /* parse add command */
4887 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4889 goto err_out_module;
4891 rbdc = rbd_get_client(ceph_opts);
4896 ceph_opts = NULL; /* rbd_dev client now owns this */
4899 osdc = &rbdc->client->osdc;
4900 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4902 goto err_out_client;
4903 spec->pool_id = (u64)rc;
4905 /* The ceph file layout needs to fit pool id in 32 bits */
4907 if (spec->pool_id > (u64)U32_MAX) {
4908 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4909 (unsigned long long)spec->pool_id, U32_MAX);
4911 goto err_out_client;
4914 rbd_dev = rbd_dev_create(rbdc, spec);
4916 goto err_out_client;
4917 rbdc = NULL; /* rbd_dev now owns this */
4918 spec = NULL; /* rbd_dev now owns this */
4920 rbd_dev->mapping.read_only = rbd_opts->read_only;
4922 rbd_opts = NULL; /* done with this */
4924 rc = rbd_dev_image_probe(rbd_dev);
4926 goto err_out_rbd_dev;
4928 rc = rbd_dev_device_setup(rbd_dev);
4932 rbd_dev_image_release(rbd_dev);
4934 rbd_dev_destroy(rbd_dev);
4936 rbd_put_client(rbdc);
4939 ceph_destroy_options(ceph_opts);
4943 module_put(THIS_MODULE);
4945 dout("Error adding device %s\n", buf);
4950 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4952 struct list_head *tmp;
4953 struct rbd_device *rbd_dev;
4955 spin_lock(&rbd_dev_list_lock);
4956 list_for_each(tmp, &rbd_dev_list) {
4957 rbd_dev = list_entry(tmp, struct rbd_device, node);
4958 if (rbd_dev->dev_id == dev_id) {
4959 spin_unlock(&rbd_dev_list_lock);
4963 spin_unlock(&rbd_dev_list_lock);
4967 static void rbd_dev_device_release(struct device *dev)
4969 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4971 rbd_free_disk(rbd_dev);
4972 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4973 rbd_dev_clear_mapping(rbd_dev);
4974 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4976 rbd_dev_id_put(rbd_dev);
4977 rbd_dev_mapping_clear(rbd_dev);
4980 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4982 while (rbd_dev->parent) {
4983 struct rbd_device *first = rbd_dev;
4984 struct rbd_device *second = first->parent;
4985 struct rbd_device *third;
4988 * Follow to the parent with no grandparent and
4991 while (second && (third = second->parent)) {
4996 rbd_dev_image_release(second);
4997 first->parent = NULL;
4998 first->parent_overlap = 0;
5000 rbd_assert(first->parent_spec);
5001 rbd_spec_put(first->parent_spec);
5002 first->parent_spec = NULL;
5006 static ssize_t rbd_remove(struct bus_type *bus,
5010 struct rbd_device *rbd_dev = NULL;
5015 ret = strict_strtoul(buf, 10, &ul);
5019 /* convert to int; abort if we lost anything in the conversion */
5020 target_id = (int) ul;
5021 if (target_id != ul)
5024 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5026 rbd_dev = __rbd_get_dev(target_id);
5032 spin_lock_irq(&rbd_dev->lock);
5033 if (rbd_dev->open_count)
5036 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5037 spin_unlock_irq(&rbd_dev->lock);
5041 rbd_bus_del_dev(rbd_dev);
5042 rbd_dev_image_release(rbd_dev);
5043 module_put(THIS_MODULE);
5045 mutex_unlock(&ctl_mutex);
5051 * create control files in sysfs
5054 static int rbd_sysfs_init(void)
5058 ret = device_register(&rbd_root_dev);
5062 ret = bus_register(&rbd_bus_type);
5064 device_unregister(&rbd_root_dev);
5069 static void rbd_sysfs_cleanup(void)
5071 bus_unregister(&rbd_bus_type);
5072 device_unregister(&rbd_root_dev);
5075 static int __init rbd_init(void)
5079 if (!libceph_compatible(NULL)) {
5080 rbd_warn(NULL, "libceph incompatibility (quitting)");
5084 rc = rbd_sysfs_init();
5087 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5091 static void __exit rbd_exit(void)
5093 rbd_sysfs_cleanup();
5096 module_init(rbd_init);
5097 module_exit(rbd_exit);
5099 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5100 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5101 MODULE_DESCRIPTION("rados block device");
5103 /* following authorship retained from original osdblk.c */
5104 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5106 MODULE_LICENSE("GPL");