2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
58 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
60 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN \
62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
64 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
66 #define RBD_SNAP_HEAD_NAME "-"
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX 64
72 #define RBD_OBJ_PREFIX_LEN_MAX 64
76 #define RBD_FEATURE_LAYERING (1<<0)
77 #define RBD_FEATURE_STRIPINGV2 (1<<1)
78 #define RBD_FEATURES_ALL \
79 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
81 /* Features supported by this (client software) implementation. */
83 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
86 * An RBD device name will be "rbd#", where the "rbd" comes from
87 * RBD_DRV_NAME above, and # is a unique integer identifier.
88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89 * enough to hold all possible device names.
91 #define DEV_NAME_LEN 32
92 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
95 * block device image metadata (in-memory version)
97 struct rbd_image_header {
98 /* These four fields never change for a given rbd image */
105 /* The remaining fields need to be updated occasionally */
107 struct ceph_snap_context *snapc;
116 * An rbd image specification.
118 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
119 * identify an image. Each rbd_dev structure includes a pointer to
120 * an rbd_spec structure that encapsulates this identity.
122 * Each of the id's in an rbd_spec has an associated name. For a
123 * user-mapped image, the names are supplied and the id's associated
124 * with them are looked up. For a layered image, a parent image is
125 * defined by the tuple, and the names are looked up.
127 * An rbd_dev structure contains a parent_spec pointer which is
128 * non-null if the image it represents is a child in a layered
129 * image. This pointer will refer to the rbd_spec structure used
130 * by the parent rbd_dev for its own identity (i.e., the structure
131 * is shared between the parent and child).
133 * Since these structures are populated once, during the discovery
134 * phase of image construction, they are effectively immutable so
135 * we make no effort to synchronize access to them.
137 * Note that code herein does not assume the image name is known (it
138 * could be a null pointer).
142 const char *pool_name;
144 const char *image_id;
145 const char *image_name;
148 const char *snap_name;
154 * an instance of the client. multiple devices may share an rbd client.
157 struct ceph_client *client;
159 struct list_head node;
162 struct rbd_img_request;
163 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
165 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
167 struct rbd_obj_request;
168 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
170 enum obj_request_type {
171 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
176 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
177 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
178 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
181 struct rbd_obj_request {
182 const char *object_name;
183 u64 offset; /* object start byte */
184 u64 length; /* bytes from offset */
188 * An object request associated with an image will have its
189 * img_data flag set; a standalone object request will not.
191 * A standalone object request will have which == BAD_WHICH
192 * and a null obj_request pointer.
194 * An object request initiated in support of a layered image
195 * object (to check for its existence before a write) will
196 * have which == BAD_WHICH and a non-null obj_request pointer.
198 * Finally, an object request for rbd image data will have
199 * which != BAD_WHICH, and will have a non-null img_request
200 * pointer. The value of which will be in the range
201 * 0..(img_request->obj_request_count-1).
204 struct rbd_obj_request *obj_request; /* STAT op */
206 struct rbd_img_request *img_request;
208 /* links for img_request->obj_requests list */
209 struct list_head links;
212 u32 which; /* posn image request list */
214 enum obj_request_type type;
216 struct bio *bio_list;
222 struct page **copyup_pages;
224 struct ceph_osd_request *osd_req;
226 u64 xferred; /* bytes transferred */
230 rbd_obj_callback_t callback;
231 struct completion completion;
237 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
238 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
239 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
242 struct rbd_img_request {
243 struct rbd_device *rbd_dev;
244 u64 offset; /* starting image byte offset */
245 u64 length; /* byte count from offset */
248 u64 snap_id; /* for reads */
249 struct ceph_snap_context *snapc; /* for writes */
252 struct request *rq; /* block request */
253 struct rbd_obj_request *obj_request; /* obj req initiator */
255 struct page **copyup_pages;
256 spinlock_t completion_lock;/* protects next_completion */
258 rbd_img_callback_t callback;
259 u64 xferred;/* aggregate bytes transferred */
260 int result; /* first nonzero obj_request result */
262 u32 obj_request_count;
263 struct list_head obj_requests; /* rbd_obj_request structs */
268 #define for_each_obj_request(ireq, oreq) \
269 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
270 #define for_each_obj_request_from(ireq, oreq) \
271 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_safe(ireq, oreq, n) \
273 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
278 struct list_head node;
293 int dev_id; /* blkdev unique id */
295 int major; /* blkdev assigned major */
296 struct gendisk *disk; /* blkdev's gendisk and rq */
298 u32 image_format; /* Either 1 or 2 */
299 struct rbd_client *rbd_client;
301 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
303 spinlock_t lock; /* queue, flags, open_count */
305 struct rbd_image_header header;
306 unsigned long flags; /* possibly lock protected */
307 struct rbd_spec *spec;
311 struct ceph_file_layout layout;
313 struct ceph_osd_event *watch_event;
314 struct rbd_obj_request *watch_request;
316 struct rbd_spec *parent_spec;
318 struct rbd_device *parent;
320 /* protects updating the header */
321 struct rw_semaphore header_rwsem;
323 struct rbd_mapping mapping;
325 struct list_head node;
327 /* list of snapshots */
328 struct list_head snaps;
332 unsigned long open_count; /* protected by lock */
336 * Flag bits for rbd_dev->flags. If atomicity is required,
337 * rbd_dev->lock is used to protect access.
339 * Currently, only the "removing" flag (which is coupled with the
340 * "open_count" field) requires atomic access.
343 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
344 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
347 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
349 static LIST_HEAD(rbd_dev_list); /* devices */
350 static DEFINE_SPINLOCK(rbd_dev_list_lock);
352 static LIST_HEAD(rbd_client_list); /* clients */
353 static DEFINE_SPINLOCK(rbd_client_list_lock);
355 static int rbd_img_request_submit(struct rbd_img_request *img_request);
357 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
359 static void rbd_dev_device_release(struct device *dev);
360 static void rbd_snap_destroy(struct rbd_snap *snap);
362 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
364 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
366 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
368 static struct bus_attribute rbd_bus_attrs[] = {
369 __ATTR(add, S_IWUSR, NULL, rbd_add),
370 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
374 static struct bus_type rbd_bus_type = {
376 .bus_attrs = rbd_bus_attrs,
379 static void rbd_root_dev_release(struct device *dev)
383 static struct device rbd_root_dev = {
385 .release = rbd_root_dev_release,
388 static __printf(2, 3)
389 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
391 struct va_format vaf;
399 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
400 else if (rbd_dev->disk)
401 printk(KERN_WARNING "%s: %s: %pV\n",
402 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
403 else if (rbd_dev->spec && rbd_dev->spec->image_name)
404 printk(KERN_WARNING "%s: image %s: %pV\n",
405 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
406 else if (rbd_dev->spec && rbd_dev->spec->image_id)
407 printk(KERN_WARNING "%s: id %s: %pV\n",
408 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
410 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
411 RBD_DRV_NAME, rbd_dev, &vaf);
416 #define rbd_assert(expr) \
417 if (unlikely(!(expr))) { \
418 printk(KERN_ERR "\nAssertion failure in %s() " \
420 "\trbd_assert(%s);\n\n", \
421 __func__, __LINE__, #expr); \
424 #else /* !RBD_DEBUG */
425 # define rbd_assert(expr) ((void) 0)
426 #endif /* !RBD_DEBUG */
428 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
429 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
430 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
432 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
433 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
435 static int rbd_open(struct block_device *bdev, fmode_t mode)
437 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
438 bool removing = false;
440 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
443 spin_lock_irq(&rbd_dev->lock);
444 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
447 rbd_dev->open_count++;
448 spin_unlock_irq(&rbd_dev->lock);
452 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
453 (void) get_device(&rbd_dev->dev);
454 set_device_ro(bdev, rbd_dev->mapping.read_only);
455 mutex_unlock(&ctl_mutex);
460 static int rbd_release(struct gendisk *disk, fmode_t mode)
462 struct rbd_device *rbd_dev = disk->private_data;
463 unsigned long open_count_before;
465 spin_lock_irq(&rbd_dev->lock);
466 open_count_before = rbd_dev->open_count--;
467 spin_unlock_irq(&rbd_dev->lock);
468 rbd_assert(open_count_before > 0);
470 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
471 put_device(&rbd_dev->dev);
472 mutex_unlock(&ctl_mutex);
477 static const struct block_device_operations rbd_bd_ops = {
478 .owner = THIS_MODULE,
480 .release = rbd_release,
484 * Initialize an rbd client instance.
487 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
489 struct rbd_client *rbdc;
492 dout("%s:\n", __func__);
493 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
497 kref_init(&rbdc->kref);
498 INIT_LIST_HEAD(&rbdc->node);
500 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
502 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
503 if (IS_ERR(rbdc->client))
505 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
507 ret = ceph_open_session(rbdc->client);
511 spin_lock(&rbd_client_list_lock);
512 list_add_tail(&rbdc->node, &rbd_client_list);
513 spin_unlock(&rbd_client_list_lock);
515 mutex_unlock(&ctl_mutex);
516 dout("%s: rbdc %p\n", __func__, rbdc);
521 ceph_destroy_client(rbdc->client);
523 mutex_unlock(&ctl_mutex);
527 ceph_destroy_options(ceph_opts);
528 dout("%s: error %d\n", __func__, ret);
533 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
535 kref_get(&rbdc->kref);
541 * Find a ceph client with specific addr and configuration. If
542 * found, bump its reference count.
544 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
546 struct rbd_client *client_node;
549 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
552 spin_lock(&rbd_client_list_lock);
553 list_for_each_entry(client_node, &rbd_client_list, node) {
554 if (!ceph_compare_options(ceph_opts, client_node->client)) {
555 __rbd_get_client(client_node);
561 spin_unlock(&rbd_client_list_lock);
563 return found ? client_node : NULL;
573 /* string args above */
576 /* Boolean args above */
580 static match_table_t rbd_opts_tokens = {
582 /* string args above */
583 {Opt_read_only, "read_only"},
584 {Opt_read_only, "ro"}, /* Alternate spelling */
585 {Opt_read_write, "read_write"},
586 {Opt_read_write, "rw"}, /* Alternate spelling */
587 /* Boolean args above */
595 #define RBD_READ_ONLY_DEFAULT false
597 static int parse_rbd_opts_token(char *c, void *private)
599 struct rbd_options *rbd_opts = private;
600 substring_t argstr[MAX_OPT_ARGS];
601 int token, intval, ret;
603 token = match_token(c, rbd_opts_tokens, argstr);
607 if (token < Opt_last_int) {
608 ret = match_int(&argstr[0], &intval);
610 pr_err("bad mount option arg (not int) "
614 dout("got int token %d val %d\n", token, intval);
615 } else if (token > Opt_last_int && token < Opt_last_string) {
616 dout("got string token %d val %s\n", token,
618 } else if (token > Opt_last_string && token < Opt_last_bool) {
619 dout("got Boolean token %d\n", token);
621 dout("got token %d\n", token);
626 rbd_opts->read_only = true;
629 rbd_opts->read_only = false;
639 * Get a ceph client with specific addr and configuration, if one does
640 * not exist create it.
642 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
644 struct rbd_client *rbdc;
646 rbdc = rbd_client_find(ceph_opts);
647 if (rbdc) /* using an existing client */
648 ceph_destroy_options(ceph_opts);
650 rbdc = rbd_client_create(ceph_opts);
656 * Destroy ceph client
658 * Caller must hold rbd_client_list_lock.
660 static void rbd_client_release(struct kref *kref)
662 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
664 dout("%s: rbdc %p\n", __func__, rbdc);
665 spin_lock(&rbd_client_list_lock);
666 list_del(&rbdc->node);
667 spin_unlock(&rbd_client_list_lock);
669 ceph_destroy_client(rbdc->client);
674 * Drop reference to ceph client node. If it's not referenced anymore, release
677 static void rbd_put_client(struct rbd_client *rbdc)
680 kref_put(&rbdc->kref, rbd_client_release);
683 static bool rbd_image_format_valid(u32 image_format)
685 return image_format == 1 || image_format == 2;
688 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
693 /* The header has to start with the magic rbd header text */
694 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
697 /* The bio layer requires at least sector-sized I/O */
699 if (ondisk->options.order < SECTOR_SHIFT)
702 /* If we use u64 in a few spots we may be able to loosen this */
704 if (ondisk->options.order > 8 * sizeof (int) - 1)
708 * The size of a snapshot header has to fit in a size_t, and
709 * that limits the number of snapshots.
711 snap_count = le32_to_cpu(ondisk->snap_count);
712 size = SIZE_MAX - sizeof (struct ceph_snap_context);
713 if (snap_count > size / sizeof (__le64))
717 * Not only that, but the size of the entire the snapshot
718 * header must also be representable in a size_t.
720 size -= snap_count * sizeof (__le64);
721 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
728 * Create a new header structure, translate header format from the on-disk
731 static int rbd_header_from_disk(struct rbd_image_header *header,
732 struct rbd_image_header_ondisk *ondisk)
739 memset(header, 0, sizeof (*header));
741 snap_count = le32_to_cpu(ondisk->snap_count);
743 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
744 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
745 if (!header->object_prefix)
747 memcpy(header->object_prefix, ondisk->object_prefix, len);
748 header->object_prefix[len] = '\0';
751 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
753 /* Save a copy of the snapshot names */
755 if (snap_names_len > (u64) SIZE_MAX)
757 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
758 if (!header->snap_names)
761 * Note that rbd_dev_v1_header_read() guarantees
762 * the ondisk buffer we're working with has
763 * snap_names_len bytes beyond the end of the
764 * snapshot id array, this memcpy() is safe.
766 memcpy(header->snap_names, &ondisk->snaps[snap_count],
769 /* Record each snapshot's size */
771 size = snap_count * sizeof (*header->snap_sizes);
772 header->snap_sizes = kmalloc(size, GFP_KERNEL);
773 if (!header->snap_sizes)
775 for (i = 0; i < snap_count; i++)
776 header->snap_sizes[i] =
777 le64_to_cpu(ondisk->snaps[i].image_size);
779 header->snap_names = NULL;
780 header->snap_sizes = NULL;
783 header->features = 0; /* No features support in v1 images */
784 header->obj_order = ondisk->options.order;
785 header->crypt_type = ondisk->options.crypt_type;
786 header->comp_type = ondisk->options.comp_type;
788 /* Allocate and fill in the snapshot context */
790 header->image_size = le64_to_cpu(ondisk->image_size);
792 header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
795 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
796 for (i = 0; i < snap_count; i++)
797 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
802 kfree(header->snap_sizes);
803 header->snap_sizes = NULL;
804 kfree(header->snap_names);
805 header->snap_names = NULL;
806 kfree(header->object_prefix);
807 header->object_prefix = NULL;
812 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
814 struct rbd_snap *snap;
816 if (snap_id == CEPH_NOSNAP)
817 return RBD_SNAP_HEAD_NAME;
819 list_for_each_entry(snap, &rbd_dev->snaps, node)
820 if (snap_id == snap->id)
826 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
827 const char *snap_name)
829 struct rbd_snap *snap;
831 list_for_each_entry(snap, &rbd_dev->snaps, node)
832 if (!strcmp(snap_name, snap->name))
838 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
840 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
841 sizeof (RBD_SNAP_HEAD_NAME))) {
842 rbd_dev->mapping.size = rbd_dev->header.image_size;
843 rbd_dev->mapping.features = rbd_dev->header.features;
845 struct rbd_snap *snap;
847 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
850 rbd_dev->mapping.size = snap->size;
851 rbd_dev->mapping.features = snap->features;
852 rbd_dev->mapping.read_only = true;
858 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
860 rbd_dev->mapping.size = 0;
861 rbd_dev->mapping.features = 0;
862 rbd_dev->mapping.read_only = true;
865 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
867 rbd_dev->mapping.size = 0;
868 rbd_dev->mapping.features = 0;
869 rbd_dev->mapping.read_only = true;
872 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
878 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
881 segment = offset >> rbd_dev->header.obj_order;
882 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
883 rbd_dev->header.object_prefix, segment);
884 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
885 pr_err("error formatting segment name for #%llu (%d)\n",
894 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
896 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
898 return offset & (segment_size - 1);
901 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
902 u64 offset, u64 length)
904 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
906 offset &= segment_size - 1;
908 rbd_assert(length <= U64_MAX - offset);
909 if (offset + length > segment_size)
910 length = segment_size - offset;
916 * returns the size of an object in the image
918 static u64 rbd_obj_bytes(struct rbd_image_header *header)
920 return 1 << header->obj_order;
927 static void bio_chain_put(struct bio *chain)
933 chain = chain->bi_next;
939 * zeros a bio chain, starting at specific offset
941 static void zero_bio_chain(struct bio *chain, int start_ofs)
950 bio_for_each_segment(bv, chain, i) {
951 if (pos + bv->bv_len > start_ofs) {
952 int remainder = max(start_ofs - pos, 0);
953 buf = bvec_kmap_irq(bv, &flags);
954 memset(buf + remainder, 0,
955 bv->bv_len - remainder);
956 bvec_kunmap_irq(buf, &flags);
961 chain = chain->bi_next;
966 * similar to zero_bio_chain(), zeros data defined by a page array,
967 * starting at the given byte offset from the start of the array and
968 * continuing up to the given end offset. The pages array is
969 * assumed to be big enough to hold all bytes up to the end.
971 static void zero_pages(struct page **pages, u64 offset, u64 end)
973 struct page **page = &pages[offset >> PAGE_SHIFT];
975 rbd_assert(end > offset);
976 rbd_assert(end - offset <= (u64)SIZE_MAX);
977 while (offset < end) {
983 page_offset = (size_t)(offset & ~PAGE_MASK);
984 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
985 local_irq_save(flags);
986 kaddr = kmap_atomic(*page);
987 memset(kaddr + page_offset, 0, length);
988 kunmap_atomic(kaddr);
989 local_irq_restore(flags);
997 * Clone a portion of a bio, starting at the given byte offset
998 * and continuing for the number of bytes indicated.
1000 static struct bio *bio_clone_range(struct bio *bio_src,
1001 unsigned int offset,
1009 unsigned short end_idx;
1010 unsigned short vcnt;
1013 /* Handle the easy case for the caller */
1015 if (!offset && len == bio_src->bi_size)
1016 return bio_clone(bio_src, gfpmask);
1018 if (WARN_ON_ONCE(!len))
1020 if (WARN_ON_ONCE(len > bio_src->bi_size))
1022 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1025 /* Find first affected segment... */
1028 __bio_for_each_segment(bv, bio_src, idx, 0) {
1029 if (resid < bv->bv_len)
1031 resid -= bv->bv_len;
1035 /* ...and the last affected segment */
1038 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1039 if (resid <= bv->bv_len)
1041 resid -= bv->bv_len;
1043 vcnt = end_idx - idx + 1;
1045 /* Build the clone */
1047 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1049 return NULL; /* ENOMEM */
1051 bio->bi_bdev = bio_src->bi_bdev;
1052 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1053 bio->bi_rw = bio_src->bi_rw;
1054 bio->bi_flags |= 1 << BIO_CLONED;
1057 * Copy over our part of the bio_vec, then update the first
1058 * and last (or only) entries.
1060 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1061 vcnt * sizeof (struct bio_vec));
1062 bio->bi_io_vec[0].bv_offset += voff;
1064 bio->bi_io_vec[0].bv_len -= voff;
1065 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1067 bio->bi_io_vec[0].bv_len = len;
1070 bio->bi_vcnt = vcnt;
1078 * Clone a portion of a bio chain, starting at the given byte offset
1079 * into the first bio in the source chain and continuing for the
1080 * number of bytes indicated. The result is another bio chain of
1081 * exactly the given length, or a null pointer on error.
1083 * The bio_src and offset parameters are both in-out. On entry they
1084 * refer to the first source bio and the offset into that bio where
1085 * the start of data to be cloned is located.
1087 * On return, bio_src is updated to refer to the bio in the source
1088 * chain that contains first un-cloned byte, and *offset will
1089 * contain the offset of that byte within that bio.
1091 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1092 unsigned int *offset,
1096 struct bio *bi = *bio_src;
1097 unsigned int off = *offset;
1098 struct bio *chain = NULL;
1101 /* Build up a chain of clone bios up to the limit */
1103 if (!bi || off >= bi->bi_size || !len)
1104 return NULL; /* Nothing to clone */
1108 unsigned int bi_size;
1112 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1113 goto out_err; /* EINVAL; ran out of bio's */
1115 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1116 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1118 goto out_err; /* ENOMEM */
1121 end = &bio->bi_next;
1124 if (off == bi->bi_size) {
1135 bio_chain_put(chain);
1141 * The default/initial value for all object request flags is 0. For
1142 * each flag, once its value is set to 1 it is never reset to 0
1145 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1147 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1148 struct rbd_device *rbd_dev;
1150 rbd_dev = obj_request->img_request->rbd_dev;
1151 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1156 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1159 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1162 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1164 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1165 struct rbd_device *rbd_dev = NULL;
1167 if (obj_request_img_data_test(obj_request))
1168 rbd_dev = obj_request->img_request->rbd_dev;
1169 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1174 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1177 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1181 * This sets the KNOWN flag after (possibly) setting the EXISTS
1182 * flag. The latter is set based on the "exists" value provided.
1184 * Note that for our purposes once an object exists it never goes
1185 * away again. It's possible that the response from two existence
1186 * checks are separated by the creation of the target object, and
1187 * the first ("doesn't exist") response arrives *after* the second
1188 * ("does exist"). In that case we ignore the second one.
1190 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1194 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1195 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1199 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1202 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1205 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1208 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1211 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1213 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1214 atomic_read(&obj_request->kref.refcount));
1215 kref_get(&obj_request->kref);
1218 static void rbd_obj_request_destroy(struct kref *kref);
1219 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1221 rbd_assert(obj_request != NULL);
1222 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1223 atomic_read(&obj_request->kref.refcount));
1224 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1227 static void rbd_img_request_get(struct rbd_img_request *img_request)
1229 dout("%s: img %p (was %d)\n", __func__, img_request,
1230 atomic_read(&img_request->kref.refcount));
1231 kref_get(&img_request->kref);
1234 static void rbd_img_request_destroy(struct kref *kref);
1235 static void rbd_img_request_put(struct rbd_img_request *img_request)
1237 rbd_assert(img_request != NULL);
1238 dout("%s: img %p (was %d)\n", __func__, img_request,
1239 atomic_read(&img_request->kref.refcount));
1240 kref_put(&img_request->kref, rbd_img_request_destroy);
1243 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1244 struct rbd_obj_request *obj_request)
1246 rbd_assert(obj_request->img_request == NULL);
1248 /* Image request now owns object's original reference */
1249 obj_request->img_request = img_request;
1250 obj_request->which = img_request->obj_request_count;
1251 rbd_assert(!obj_request_img_data_test(obj_request));
1252 obj_request_img_data_set(obj_request);
1253 rbd_assert(obj_request->which != BAD_WHICH);
1254 img_request->obj_request_count++;
1255 list_add_tail(&obj_request->links, &img_request->obj_requests);
1256 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1257 obj_request->which);
1260 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1261 struct rbd_obj_request *obj_request)
1263 rbd_assert(obj_request->which != BAD_WHICH);
1265 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1266 obj_request->which);
1267 list_del(&obj_request->links);
1268 rbd_assert(img_request->obj_request_count > 0);
1269 img_request->obj_request_count--;
1270 rbd_assert(obj_request->which == img_request->obj_request_count);
1271 obj_request->which = BAD_WHICH;
1272 rbd_assert(obj_request_img_data_test(obj_request));
1273 rbd_assert(obj_request->img_request == img_request);
1274 obj_request->img_request = NULL;
1275 obj_request->callback = NULL;
1276 rbd_obj_request_put(obj_request);
1279 static bool obj_request_type_valid(enum obj_request_type type)
1282 case OBJ_REQUEST_NODATA:
1283 case OBJ_REQUEST_BIO:
1284 case OBJ_REQUEST_PAGES:
1291 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1292 struct rbd_obj_request *obj_request)
1294 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1296 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1299 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1302 dout("%s: img %p\n", __func__, img_request);
1305 * If no error occurred, compute the aggregate transfer
1306 * count for the image request. We could instead use
1307 * atomic64_cmpxchg() to update it as each object request
1308 * completes; not clear which way is better off hand.
1310 if (!img_request->result) {
1311 struct rbd_obj_request *obj_request;
1314 for_each_obj_request(img_request, obj_request)
1315 xferred += obj_request->xferred;
1316 img_request->xferred = xferred;
1319 if (img_request->callback)
1320 img_request->callback(img_request);
1322 rbd_img_request_put(img_request);
1325 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1327 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1329 dout("%s: obj %p\n", __func__, obj_request);
1331 return wait_for_completion_interruptible(&obj_request->completion);
1335 * The default/initial value for all image request flags is 0. Each
1336 * is conditionally set to 1 at image request initialization time
1337 * and currently never change thereafter.
1339 static void img_request_write_set(struct rbd_img_request *img_request)
1341 set_bit(IMG_REQ_WRITE, &img_request->flags);
1345 static bool img_request_write_test(struct rbd_img_request *img_request)
1348 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1351 static void img_request_child_set(struct rbd_img_request *img_request)
1353 set_bit(IMG_REQ_CHILD, &img_request->flags);
1357 static bool img_request_child_test(struct rbd_img_request *img_request)
1360 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1363 static void img_request_layered_set(struct rbd_img_request *img_request)
1365 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1369 static bool img_request_layered_test(struct rbd_img_request *img_request)
1372 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1376 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1378 u64 xferred = obj_request->xferred;
1379 u64 length = obj_request->length;
1381 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1382 obj_request, obj_request->img_request, obj_request->result,
1385 * ENOENT means a hole in the image. We zero-fill the
1386 * entire length of the request. A short read also implies
1387 * zero-fill to the end of the request. Either way we
1388 * update the xferred count to indicate the whole request
1391 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1392 if (obj_request->result == -ENOENT) {
1393 if (obj_request->type == OBJ_REQUEST_BIO)
1394 zero_bio_chain(obj_request->bio_list, 0);
1396 zero_pages(obj_request->pages, 0, length);
1397 obj_request->result = 0;
1398 obj_request->xferred = length;
1399 } else if (xferred < length && !obj_request->result) {
1400 if (obj_request->type == OBJ_REQUEST_BIO)
1401 zero_bio_chain(obj_request->bio_list, xferred);
1403 zero_pages(obj_request->pages, xferred, length);
1404 obj_request->xferred = length;
1406 obj_request_done_set(obj_request);
1409 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1411 dout("%s: obj %p cb %p\n", __func__, obj_request,
1412 obj_request->callback);
1413 if (obj_request->callback)
1414 obj_request->callback(obj_request);
1416 complete_all(&obj_request->completion);
1419 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1421 dout("%s: obj %p\n", __func__, obj_request);
1422 obj_request_done_set(obj_request);
1425 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1427 struct rbd_img_request *img_request = NULL;
1428 struct rbd_device *rbd_dev = NULL;
1429 bool layered = false;
1431 if (obj_request_img_data_test(obj_request)) {
1432 img_request = obj_request->img_request;
1433 layered = img_request && img_request_layered_test(img_request);
1434 rbd_dev = img_request->rbd_dev;
1437 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1438 obj_request, img_request, obj_request->result,
1439 obj_request->xferred, obj_request->length);
1440 if (layered && obj_request->result == -ENOENT &&
1441 obj_request->img_offset < rbd_dev->parent_overlap)
1442 rbd_img_parent_read(obj_request);
1443 else if (img_request)
1444 rbd_img_obj_request_read_callback(obj_request);
1446 obj_request_done_set(obj_request);
1449 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1451 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1452 obj_request->result, obj_request->length);
1454 * There is no such thing as a successful short write. Set
1455 * it to our originally-requested length.
1457 obj_request->xferred = obj_request->length;
1458 obj_request_done_set(obj_request);
1462 * For a simple stat call there's nothing to do. We'll do more if
1463 * this is part of a write sequence for a layered image.
1465 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1467 dout("%s: obj %p\n", __func__, obj_request);
1468 obj_request_done_set(obj_request);
1471 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1472 struct ceph_msg *msg)
1474 struct rbd_obj_request *obj_request = osd_req->r_priv;
1477 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1478 rbd_assert(osd_req == obj_request->osd_req);
1479 if (obj_request_img_data_test(obj_request)) {
1480 rbd_assert(obj_request->img_request);
1481 rbd_assert(obj_request->which != BAD_WHICH);
1483 rbd_assert(obj_request->which == BAD_WHICH);
1486 if (osd_req->r_result < 0)
1487 obj_request->result = osd_req->r_result;
1488 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1490 BUG_ON(osd_req->r_num_ops > 2);
1493 * We support a 64-bit length, but ultimately it has to be
1494 * passed to blk_end_request(), which takes an unsigned int.
1496 obj_request->xferred = osd_req->r_reply_op_len[0];
1497 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1498 opcode = osd_req->r_ops[0].op;
1500 case CEPH_OSD_OP_READ:
1501 rbd_osd_read_callback(obj_request);
1503 case CEPH_OSD_OP_WRITE:
1504 rbd_osd_write_callback(obj_request);
1506 case CEPH_OSD_OP_STAT:
1507 rbd_osd_stat_callback(obj_request);
1509 case CEPH_OSD_OP_CALL:
1510 case CEPH_OSD_OP_NOTIFY_ACK:
1511 case CEPH_OSD_OP_WATCH:
1512 rbd_osd_trivial_callback(obj_request);
1515 rbd_warn(NULL, "%s: unsupported op %hu\n",
1516 obj_request->object_name, (unsigned short) opcode);
1520 if (obj_request_done_test(obj_request))
1521 rbd_obj_request_complete(obj_request);
1524 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1526 struct rbd_img_request *img_request = obj_request->img_request;
1527 struct ceph_osd_request *osd_req = obj_request->osd_req;
1530 rbd_assert(osd_req != NULL);
1532 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1533 ceph_osdc_build_request(osd_req, obj_request->offset,
1534 NULL, snap_id, NULL);
1537 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1539 struct rbd_img_request *img_request = obj_request->img_request;
1540 struct ceph_osd_request *osd_req = obj_request->osd_req;
1541 struct ceph_snap_context *snapc;
1542 struct timespec mtime = CURRENT_TIME;
1544 rbd_assert(osd_req != NULL);
1546 snapc = img_request ? img_request->snapc : NULL;
1547 ceph_osdc_build_request(osd_req, obj_request->offset,
1548 snapc, CEPH_NOSNAP, &mtime);
1551 static struct ceph_osd_request *rbd_osd_req_create(
1552 struct rbd_device *rbd_dev,
1554 struct rbd_obj_request *obj_request)
1556 struct ceph_snap_context *snapc = NULL;
1557 struct ceph_osd_client *osdc;
1558 struct ceph_osd_request *osd_req;
1560 if (obj_request_img_data_test(obj_request)) {
1561 struct rbd_img_request *img_request = obj_request->img_request;
1563 rbd_assert(write_request ==
1564 img_request_write_test(img_request));
1566 snapc = img_request->snapc;
1569 /* Allocate and initialize the request, for the single op */
1571 osdc = &rbd_dev->rbd_client->client->osdc;
1572 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1574 return NULL; /* ENOMEM */
1577 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1579 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1581 osd_req->r_callback = rbd_osd_req_callback;
1582 osd_req->r_priv = obj_request;
1584 osd_req->r_oid_len = strlen(obj_request->object_name);
1585 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1586 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1588 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1594 * Create a copyup osd request based on the information in the
1595 * object request supplied. A copyup request has two osd ops,
1596 * a copyup method call, and a "normal" write request.
1598 static struct ceph_osd_request *
1599 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1601 struct rbd_img_request *img_request;
1602 struct ceph_snap_context *snapc;
1603 struct rbd_device *rbd_dev;
1604 struct ceph_osd_client *osdc;
1605 struct ceph_osd_request *osd_req;
1607 rbd_assert(obj_request_img_data_test(obj_request));
1608 img_request = obj_request->img_request;
1609 rbd_assert(img_request);
1610 rbd_assert(img_request_write_test(img_request));
1612 /* Allocate and initialize the request, for the two ops */
1614 snapc = img_request->snapc;
1615 rbd_dev = img_request->rbd_dev;
1616 osdc = &rbd_dev->rbd_client->client->osdc;
1617 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1619 return NULL; /* ENOMEM */
1621 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1622 osd_req->r_callback = rbd_osd_req_callback;
1623 osd_req->r_priv = obj_request;
1625 osd_req->r_oid_len = strlen(obj_request->object_name);
1626 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1627 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1629 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1635 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1637 ceph_osdc_put_request(osd_req);
1640 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1642 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1643 u64 offset, u64 length,
1644 enum obj_request_type type)
1646 struct rbd_obj_request *obj_request;
1650 rbd_assert(obj_request_type_valid(type));
1652 size = strlen(object_name) + 1;
1653 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1657 name = (char *)(obj_request + 1);
1658 obj_request->object_name = memcpy(name, object_name, size);
1659 obj_request->offset = offset;
1660 obj_request->length = length;
1661 obj_request->flags = 0;
1662 obj_request->which = BAD_WHICH;
1663 obj_request->type = type;
1664 INIT_LIST_HEAD(&obj_request->links);
1665 init_completion(&obj_request->completion);
1666 kref_init(&obj_request->kref);
1668 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1669 offset, length, (int)type, obj_request);
1674 static void rbd_obj_request_destroy(struct kref *kref)
1676 struct rbd_obj_request *obj_request;
1678 obj_request = container_of(kref, struct rbd_obj_request, kref);
1680 dout("%s: obj %p\n", __func__, obj_request);
1682 rbd_assert(obj_request->img_request == NULL);
1683 rbd_assert(obj_request->which == BAD_WHICH);
1685 if (obj_request->osd_req)
1686 rbd_osd_req_destroy(obj_request->osd_req);
1688 rbd_assert(obj_request_type_valid(obj_request->type));
1689 switch (obj_request->type) {
1690 case OBJ_REQUEST_NODATA:
1691 break; /* Nothing to do */
1692 case OBJ_REQUEST_BIO:
1693 if (obj_request->bio_list)
1694 bio_chain_put(obj_request->bio_list);
1696 case OBJ_REQUEST_PAGES:
1697 if (obj_request->pages)
1698 ceph_release_page_vector(obj_request->pages,
1699 obj_request->page_count);
1707 * Caller is responsible for filling in the list of object requests
1708 * that comprises the image request, and the Linux request pointer
1709 * (if there is one).
1711 static struct rbd_img_request *rbd_img_request_create(
1712 struct rbd_device *rbd_dev,
1713 u64 offset, u64 length,
1717 struct rbd_img_request *img_request;
1719 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1723 if (write_request) {
1724 down_read(&rbd_dev->header_rwsem);
1725 ceph_get_snap_context(rbd_dev->header.snapc);
1726 up_read(&rbd_dev->header_rwsem);
1729 img_request->rq = NULL;
1730 img_request->rbd_dev = rbd_dev;
1731 img_request->offset = offset;
1732 img_request->length = length;
1733 img_request->flags = 0;
1734 if (write_request) {
1735 img_request_write_set(img_request);
1736 img_request->snapc = rbd_dev->header.snapc;
1738 img_request->snap_id = rbd_dev->spec->snap_id;
1741 img_request_child_set(img_request);
1742 if (rbd_dev->parent_spec)
1743 img_request_layered_set(img_request);
1744 spin_lock_init(&img_request->completion_lock);
1745 img_request->next_completion = 0;
1746 img_request->callback = NULL;
1747 img_request->result = 0;
1748 img_request->obj_request_count = 0;
1749 INIT_LIST_HEAD(&img_request->obj_requests);
1750 kref_init(&img_request->kref);
1752 rbd_img_request_get(img_request); /* Avoid a warning */
1753 rbd_img_request_put(img_request); /* TEMPORARY */
1755 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1756 write_request ? "write" : "read", offset, length,
1762 static void rbd_img_request_destroy(struct kref *kref)
1764 struct rbd_img_request *img_request;
1765 struct rbd_obj_request *obj_request;
1766 struct rbd_obj_request *next_obj_request;
1768 img_request = container_of(kref, struct rbd_img_request, kref);
1770 dout("%s: img %p\n", __func__, img_request);
1772 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1773 rbd_img_obj_request_del(img_request, obj_request);
1774 rbd_assert(img_request->obj_request_count == 0);
1776 if (img_request_write_test(img_request))
1777 ceph_put_snap_context(img_request->snapc);
1779 if (img_request_child_test(img_request))
1780 rbd_obj_request_put(img_request->obj_request);
1785 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1787 struct rbd_img_request *img_request;
1788 unsigned int xferred;
1792 rbd_assert(obj_request_img_data_test(obj_request));
1793 img_request = obj_request->img_request;
1795 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1796 xferred = (unsigned int)obj_request->xferred;
1797 result = obj_request->result;
1799 struct rbd_device *rbd_dev = img_request->rbd_dev;
1801 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1802 img_request_write_test(img_request) ? "write" : "read",
1803 obj_request->length, obj_request->img_offset,
1804 obj_request->offset);
1805 rbd_warn(rbd_dev, " result %d xferred %x\n",
1807 if (!img_request->result)
1808 img_request->result = result;
1811 /* Image object requests don't own their page array */
1813 if (obj_request->type == OBJ_REQUEST_PAGES) {
1814 obj_request->pages = NULL;
1815 obj_request->page_count = 0;
1818 if (img_request_child_test(img_request)) {
1819 rbd_assert(img_request->obj_request != NULL);
1820 more = obj_request->which < img_request->obj_request_count - 1;
1822 rbd_assert(img_request->rq != NULL);
1823 more = blk_end_request(img_request->rq, result, xferred);
1829 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1831 struct rbd_img_request *img_request;
1832 u32 which = obj_request->which;
1835 rbd_assert(obj_request_img_data_test(obj_request));
1836 img_request = obj_request->img_request;
1838 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1839 rbd_assert(img_request != NULL);
1840 rbd_assert(img_request->obj_request_count > 0);
1841 rbd_assert(which != BAD_WHICH);
1842 rbd_assert(which < img_request->obj_request_count);
1843 rbd_assert(which >= img_request->next_completion);
1845 spin_lock_irq(&img_request->completion_lock);
1846 if (which != img_request->next_completion)
1849 for_each_obj_request_from(img_request, obj_request) {
1851 rbd_assert(which < img_request->obj_request_count);
1853 if (!obj_request_done_test(obj_request))
1855 more = rbd_img_obj_end_request(obj_request);
1859 rbd_assert(more ^ (which == img_request->obj_request_count));
1860 img_request->next_completion = which;
1862 spin_unlock_irq(&img_request->completion_lock);
1865 rbd_img_request_complete(img_request);
1869 * Split up an image request into one or more object requests, each
1870 * to a different object. The "type" parameter indicates whether
1871 * "data_desc" is the pointer to the head of a list of bio
1872 * structures, or the base of a page array. In either case this
1873 * function assumes data_desc describes memory sufficient to hold
1874 * all data described by the image request.
1876 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1877 enum obj_request_type type,
1880 struct rbd_device *rbd_dev = img_request->rbd_dev;
1881 struct rbd_obj_request *obj_request = NULL;
1882 struct rbd_obj_request *next_obj_request;
1883 bool write_request = img_request_write_test(img_request);
1884 struct bio *bio_list;
1885 unsigned int bio_offset = 0;
1886 struct page **pages;
1891 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1892 (int)type, data_desc);
1894 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1895 img_offset = img_request->offset;
1896 resid = img_request->length;
1897 rbd_assert(resid > 0);
1899 if (type == OBJ_REQUEST_BIO) {
1900 bio_list = data_desc;
1901 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1903 rbd_assert(type == OBJ_REQUEST_PAGES);
1908 struct ceph_osd_request *osd_req;
1909 const char *object_name;
1913 object_name = rbd_segment_name(rbd_dev, img_offset);
1916 offset = rbd_segment_offset(rbd_dev, img_offset);
1917 length = rbd_segment_length(rbd_dev, img_offset, resid);
1918 obj_request = rbd_obj_request_create(object_name,
1919 offset, length, type);
1920 kfree(object_name); /* object request has its own copy */
1924 if (type == OBJ_REQUEST_BIO) {
1925 unsigned int clone_size;
1927 rbd_assert(length <= (u64)UINT_MAX);
1928 clone_size = (unsigned int)length;
1929 obj_request->bio_list =
1930 bio_chain_clone_range(&bio_list,
1934 if (!obj_request->bio_list)
1937 unsigned int page_count;
1939 obj_request->pages = pages;
1940 page_count = (u32)calc_pages_for(offset, length);
1941 obj_request->page_count = page_count;
1942 if ((offset + length) & ~PAGE_MASK)
1943 page_count--; /* more on last page */
1944 pages += page_count;
1947 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1951 obj_request->osd_req = osd_req;
1952 obj_request->callback = rbd_img_obj_callback;
1954 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1956 if (type == OBJ_REQUEST_BIO)
1957 osd_req_op_extent_osd_data_bio(osd_req, 0,
1958 obj_request->bio_list, length);
1960 osd_req_op_extent_osd_data_pages(osd_req, 0,
1961 obj_request->pages, length,
1962 offset & ~PAGE_MASK, false, false);
1965 rbd_osd_req_format_write(obj_request);
1967 rbd_osd_req_format_read(obj_request);
1969 obj_request->img_offset = img_offset;
1970 rbd_img_obj_request_add(img_request, obj_request);
1972 img_offset += length;
1979 rbd_obj_request_put(obj_request);
1981 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1982 rbd_obj_request_put(obj_request);
1988 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
1990 struct rbd_img_request *img_request;
1991 struct rbd_device *rbd_dev;
1995 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
1996 rbd_assert(obj_request_img_data_test(obj_request));
1997 img_request = obj_request->img_request;
1998 rbd_assert(img_request);
2000 rbd_dev = img_request->rbd_dev;
2001 rbd_assert(rbd_dev);
2002 length = (u64)1 << rbd_dev->header.obj_order;
2003 page_count = (u32)calc_pages_for(0, length);
2005 rbd_assert(obj_request->copyup_pages);
2006 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2007 obj_request->copyup_pages = NULL;
2010 * We want the transfer count to reflect the size of the
2011 * original write request. There is no such thing as a
2012 * successful short write, so if the request was successful
2013 * we can just set it to the originally-requested length.
2015 if (!obj_request->result)
2016 obj_request->xferred = obj_request->length;
2018 /* Finish up with the normal image object callback */
2020 rbd_img_obj_callback(obj_request);
2024 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2026 struct rbd_obj_request *orig_request;
2027 struct ceph_osd_request *osd_req;
2028 struct ceph_osd_client *osdc;
2029 struct rbd_device *rbd_dev;
2030 struct page **pages;
2035 rbd_assert(img_request_child_test(img_request));
2037 /* First get what we need from the image request */
2039 pages = img_request->copyup_pages;
2040 rbd_assert(pages != NULL);
2041 img_request->copyup_pages = NULL;
2043 orig_request = img_request->obj_request;
2044 rbd_assert(orig_request != NULL);
2045 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2046 result = img_request->result;
2047 obj_size = img_request->length;
2048 xferred = img_request->xferred;
2050 rbd_dev = img_request->rbd_dev;
2051 rbd_assert(rbd_dev);
2052 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2054 rbd_img_request_put(img_request);
2059 /* Allocate the new copyup osd request for the original request */
2062 rbd_assert(!orig_request->osd_req);
2063 osd_req = rbd_osd_req_create_copyup(orig_request);
2066 orig_request->osd_req = osd_req;
2067 orig_request->copyup_pages = pages;
2069 /* Initialize the copyup op */
2071 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2072 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2075 /* Then the original write request op */
2077 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2078 orig_request->offset,
2079 orig_request->length, 0, 0);
2080 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2081 orig_request->length);
2083 rbd_osd_req_format_write(orig_request);
2085 /* All set, send it off. */
2087 orig_request->callback = rbd_img_obj_copyup_callback;
2088 osdc = &rbd_dev->rbd_client->client->osdc;
2089 result = rbd_obj_request_submit(osdc, orig_request);
2093 /* Record the error code and complete the request */
2095 orig_request->result = result;
2096 orig_request->xferred = 0;
2097 obj_request_done_set(orig_request);
2098 rbd_obj_request_complete(orig_request);
2102 * Read from the parent image the range of data that covers the
2103 * entire target of the given object request. This is used for
2104 * satisfying a layered image write request when the target of an
2105 * object request from the image request does not exist.
2107 * A page array big enough to hold the returned data is allocated
2108 * and supplied to rbd_img_request_fill() as the "data descriptor."
2109 * When the read completes, this page array will be transferred to
2110 * the original object request for the copyup operation.
2112 * If an error occurs, record it as the result of the original
2113 * object request and mark it done so it gets completed.
2115 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2117 struct rbd_img_request *img_request = NULL;
2118 struct rbd_img_request *parent_request = NULL;
2119 struct rbd_device *rbd_dev;
2122 struct page **pages = NULL;
2126 rbd_assert(obj_request_img_data_test(obj_request));
2127 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2129 img_request = obj_request->img_request;
2130 rbd_assert(img_request != NULL);
2131 rbd_dev = img_request->rbd_dev;
2132 rbd_assert(rbd_dev->parent != NULL);
2135 * First things first. The original osd request is of no
2136 * use to use any more, we'll need a new one that can hold
2137 * the two ops in a copyup request. We'll get that later,
2138 * but for now we can release the old one.
2140 rbd_osd_req_destroy(obj_request->osd_req);
2141 obj_request->osd_req = NULL;
2144 * Determine the byte range covered by the object in the
2145 * child image to which the original request was to be sent.
2147 img_offset = obj_request->img_offset - obj_request->offset;
2148 length = (u64)1 << rbd_dev->header.obj_order;
2151 * There is no defined parent data beyond the parent
2152 * overlap, so limit what we read at that boundary if
2155 if (img_offset + length > rbd_dev->parent_overlap) {
2156 rbd_assert(img_offset < rbd_dev->parent_overlap);
2157 length = rbd_dev->parent_overlap - img_offset;
2161 * Allocate a page array big enough to receive the data read
2164 page_count = (u32)calc_pages_for(0, length);
2165 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2166 if (IS_ERR(pages)) {
2167 result = PTR_ERR(pages);
2173 parent_request = rbd_img_request_create(rbd_dev->parent,
2176 if (!parent_request)
2178 rbd_obj_request_get(obj_request);
2179 parent_request->obj_request = obj_request;
2181 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2184 parent_request->copyup_pages = pages;
2186 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2187 result = rbd_img_request_submit(parent_request);
2191 parent_request->copyup_pages = NULL;
2192 parent_request->obj_request = NULL;
2193 rbd_obj_request_put(obj_request);
2196 ceph_release_page_vector(pages, page_count);
2198 rbd_img_request_put(parent_request);
2199 obj_request->result = result;
2200 obj_request->xferred = 0;
2201 obj_request_done_set(obj_request);
2206 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2208 struct rbd_obj_request *orig_request;
2211 rbd_assert(!obj_request_img_data_test(obj_request));
2214 * All we need from the object request is the original
2215 * request and the result of the STAT op. Grab those, then
2216 * we're done with the request.
2218 orig_request = obj_request->obj_request;
2219 obj_request->obj_request = NULL;
2220 rbd_assert(orig_request);
2221 rbd_assert(orig_request->img_request);
2223 result = obj_request->result;
2224 obj_request->result = 0;
2226 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2227 obj_request, orig_request, result,
2228 obj_request->xferred, obj_request->length);
2229 rbd_obj_request_put(obj_request);
2231 rbd_assert(orig_request);
2232 rbd_assert(orig_request->img_request);
2235 * Our only purpose here is to determine whether the object
2236 * exists, and we don't want to treat the non-existence as
2237 * an error. If something else comes back, transfer the
2238 * error to the original request and complete it now.
2241 obj_request_existence_set(orig_request, true);
2242 } else if (result == -ENOENT) {
2243 obj_request_existence_set(orig_request, false);
2244 } else if (result) {
2245 orig_request->result = result;
2250 * Resubmit the original request now that we have recorded
2251 * whether the target object exists.
2253 orig_request->result = rbd_img_obj_request_submit(orig_request);
2255 if (orig_request->result)
2256 rbd_obj_request_complete(orig_request);
2257 rbd_obj_request_put(orig_request);
2260 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2262 struct rbd_obj_request *stat_request;
2263 struct rbd_device *rbd_dev;
2264 struct ceph_osd_client *osdc;
2265 struct page **pages = NULL;
2271 * The response data for a STAT call consists of:
2278 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2279 page_count = (u32)calc_pages_for(0, size);
2280 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2282 return PTR_ERR(pages);
2285 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2290 rbd_obj_request_get(obj_request);
2291 stat_request->obj_request = obj_request;
2292 stat_request->pages = pages;
2293 stat_request->page_count = page_count;
2295 rbd_assert(obj_request->img_request);
2296 rbd_dev = obj_request->img_request->rbd_dev;
2297 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2299 if (!stat_request->osd_req)
2301 stat_request->callback = rbd_img_obj_exists_callback;
2303 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2304 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2306 rbd_osd_req_format_read(stat_request);
2308 osdc = &rbd_dev->rbd_client->client->osdc;
2309 ret = rbd_obj_request_submit(osdc, stat_request);
2312 rbd_obj_request_put(obj_request);
2317 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2319 struct rbd_img_request *img_request;
2320 struct rbd_device *rbd_dev;
2323 rbd_assert(obj_request_img_data_test(obj_request));
2325 img_request = obj_request->img_request;
2326 rbd_assert(img_request);
2327 rbd_dev = img_request->rbd_dev;
2330 * Only writes to layered images need special handling.
2331 * Reads and non-layered writes are simple object requests.
2332 * Layered writes that start beyond the end of the overlap
2333 * with the parent have no parent data, so they too are
2334 * simple object requests. Finally, if the target object is
2335 * known to already exist, its parent data has already been
2336 * copied, so a write to the object can also be handled as a
2337 * simple object request.
2339 if (!img_request_write_test(img_request) ||
2340 !img_request_layered_test(img_request) ||
2341 rbd_dev->parent_overlap <= obj_request->img_offset ||
2342 ((known = obj_request_known_test(obj_request)) &&
2343 obj_request_exists_test(obj_request))) {
2345 struct rbd_device *rbd_dev;
2346 struct ceph_osd_client *osdc;
2348 rbd_dev = obj_request->img_request->rbd_dev;
2349 osdc = &rbd_dev->rbd_client->client->osdc;
2351 return rbd_obj_request_submit(osdc, obj_request);
2355 * It's a layered write. The target object might exist but
2356 * we may not know that yet. If we know it doesn't exist,
2357 * start by reading the data for the full target object from
2358 * the parent so we can use it for a copyup to the target.
2361 return rbd_img_obj_parent_read_full(obj_request);
2363 /* We don't know whether the target exists. Go find out. */
2365 return rbd_img_obj_exists_submit(obj_request);
2368 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2370 struct rbd_obj_request *obj_request;
2371 struct rbd_obj_request *next_obj_request;
2373 dout("%s: img %p\n", __func__, img_request);
2374 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2377 ret = rbd_img_obj_request_submit(obj_request);
2385 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2387 struct rbd_obj_request *obj_request;
2388 struct rbd_device *rbd_dev;
2391 rbd_assert(img_request_child_test(img_request));
2393 obj_request = img_request->obj_request;
2394 rbd_assert(obj_request);
2395 rbd_assert(obj_request->img_request);
2397 obj_request->result = img_request->result;
2398 if (obj_request->result)
2402 * We need to zero anything beyond the parent overlap
2403 * boundary. Since rbd_img_obj_request_read_callback()
2404 * will zero anything beyond the end of a short read, an
2405 * easy way to do this is to pretend the data from the
2406 * parent came up short--ending at the overlap boundary.
2408 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2409 obj_end = obj_request->img_offset + obj_request->length;
2410 rbd_dev = obj_request->img_request->rbd_dev;
2411 if (obj_end > rbd_dev->parent_overlap) {
2414 if (obj_request->img_offset < rbd_dev->parent_overlap)
2415 xferred = rbd_dev->parent_overlap -
2416 obj_request->img_offset;
2418 obj_request->xferred = min(img_request->xferred, xferred);
2420 obj_request->xferred = img_request->xferred;
2423 rbd_img_obj_request_read_callback(obj_request);
2424 rbd_obj_request_complete(obj_request);
2427 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2429 struct rbd_device *rbd_dev;
2430 struct rbd_img_request *img_request;
2433 rbd_assert(obj_request_img_data_test(obj_request));
2434 rbd_assert(obj_request->img_request != NULL);
2435 rbd_assert(obj_request->result == (s32) -ENOENT);
2436 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2438 rbd_dev = obj_request->img_request->rbd_dev;
2439 rbd_assert(rbd_dev->parent != NULL);
2440 /* rbd_read_finish(obj_request, obj_request->length); */
2441 img_request = rbd_img_request_create(rbd_dev->parent,
2442 obj_request->img_offset,
2443 obj_request->length,
2449 rbd_obj_request_get(obj_request);
2450 img_request->obj_request = obj_request;
2452 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2453 obj_request->bio_list);
2457 img_request->callback = rbd_img_parent_read_callback;
2458 result = rbd_img_request_submit(img_request);
2465 rbd_img_request_put(img_request);
2466 obj_request->result = result;
2467 obj_request->xferred = 0;
2468 obj_request_done_set(obj_request);
2471 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2472 u64 ver, u64 notify_id)
2474 struct rbd_obj_request *obj_request;
2475 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2478 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2479 OBJ_REQUEST_NODATA);
2484 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2485 if (!obj_request->osd_req)
2487 obj_request->callback = rbd_obj_request_put;
2489 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2491 rbd_osd_req_format_read(obj_request);
2493 ret = rbd_obj_request_submit(osdc, obj_request);
2496 rbd_obj_request_put(obj_request);
2501 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2503 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2509 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2510 rbd_dev->header_name, (unsigned long long) notify_id,
2511 (unsigned int) opcode);
2512 (void)rbd_dev_refresh(rbd_dev, &hver);
2514 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2518 * Request sync osd watch/unwatch. The value of "start" determines
2519 * whether a watch request is being initiated or torn down.
2521 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2523 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2524 struct rbd_obj_request *obj_request;
2527 rbd_assert(start ^ !!rbd_dev->watch_event);
2528 rbd_assert(start ^ !!rbd_dev->watch_request);
2531 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2532 &rbd_dev->watch_event);
2535 rbd_assert(rbd_dev->watch_event != NULL);
2539 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2540 OBJ_REQUEST_NODATA);
2544 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2545 if (!obj_request->osd_req)
2549 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2551 ceph_osdc_unregister_linger_request(osdc,
2552 rbd_dev->watch_request->osd_req);
2554 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2555 rbd_dev->watch_event->cookie, 0, start);
2556 rbd_osd_req_format_write(obj_request);
2558 ret = rbd_obj_request_submit(osdc, obj_request);
2561 ret = rbd_obj_request_wait(obj_request);
2564 ret = obj_request->result;
2569 * A watch request is set to linger, so the underlying osd
2570 * request won't go away until we unregister it. We retain
2571 * a pointer to the object request during that time (in
2572 * rbd_dev->watch_request), so we'll keep a reference to
2573 * it. We'll drop that reference (below) after we've
2577 rbd_dev->watch_request = obj_request;
2582 /* We have successfully torn down the watch request */
2584 rbd_obj_request_put(rbd_dev->watch_request);
2585 rbd_dev->watch_request = NULL;
2587 /* Cancel the event if we're tearing down, or on error */
2588 ceph_osdc_cancel_event(rbd_dev->watch_event);
2589 rbd_dev->watch_event = NULL;
2591 rbd_obj_request_put(obj_request);
2597 * Synchronous osd object method call. Returns the number of bytes
2598 * returned in the outbound buffer, or a negative error code.
2600 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2601 const char *object_name,
2602 const char *class_name,
2603 const char *method_name,
2604 const void *outbound,
2605 size_t outbound_size,
2607 size_t inbound_size,
2610 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2611 struct rbd_obj_request *obj_request;
2612 struct page **pages;
2617 * Method calls are ultimately read operations. The result
2618 * should placed into the inbound buffer provided. They
2619 * also supply outbound data--parameters for the object
2620 * method. Currently if this is present it will be a
2623 page_count = (u32)calc_pages_for(0, inbound_size);
2624 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2626 return PTR_ERR(pages);
2629 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2634 obj_request->pages = pages;
2635 obj_request->page_count = page_count;
2637 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2638 if (!obj_request->osd_req)
2641 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2642 class_name, method_name);
2643 if (outbound_size) {
2644 struct ceph_pagelist *pagelist;
2646 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2650 ceph_pagelist_init(pagelist);
2651 ceph_pagelist_append(pagelist, outbound, outbound_size);
2652 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2655 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2656 obj_request->pages, inbound_size,
2658 rbd_osd_req_format_read(obj_request);
2660 ret = rbd_obj_request_submit(osdc, obj_request);
2663 ret = rbd_obj_request_wait(obj_request);
2667 ret = obj_request->result;
2671 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2672 ret = (int)obj_request->xferred;
2673 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2675 *version = obj_request->version;
2678 rbd_obj_request_put(obj_request);
2680 ceph_release_page_vector(pages, page_count);
2685 static void rbd_request_fn(struct request_queue *q)
2686 __releases(q->queue_lock) __acquires(q->queue_lock)
2688 struct rbd_device *rbd_dev = q->queuedata;
2689 bool read_only = rbd_dev->mapping.read_only;
2693 while ((rq = blk_fetch_request(q))) {
2694 bool write_request = rq_data_dir(rq) == WRITE;
2695 struct rbd_img_request *img_request;
2699 /* Ignore any non-FS requests that filter through. */
2701 if (rq->cmd_type != REQ_TYPE_FS) {
2702 dout("%s: non-fs request type %d\n", __func__,
2703 (int) rq->cmd_type);
2704 __blk_end_request_all(rq, 0);
2708 /* Ignore/skip any zero-length requests */
2710 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2711 length = (u64) blk_rq_bytes(rq);
2714 dout("%s: zero-length request\n", __func__);
2715 __blk_end_request_all(rq, 0);
2719 spin_unlock_irq(q->queue_lock);
2721 /* Disallow writes to a read-only device */
2723 if (write_request) {
2727 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2731 * Quit early if the mapped snapshot no longer
2732 * exists. It's still possible the snapshot will
2733 * have disappeared by the time our request arrives
2734 * at the osd, but there's no sense in sending it if
2737 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2738 dout("request for non-existent snapshot");
2739 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2745 if (offset && length > U64_MAX - offset + 1) {
2746 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2748 goto end_request; /* Shouldn't happen */
2752 img_request = rbd_img_request_create(rbd_dev, offset, length,
2753 write_request, false);
2757 img_request->rq = rq;
2759 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2762 result = rbd_img_request_submit(img_request);
2764 rbd_img_request_put(img_request);
2766 spin_lock_irq(q->queue_lock);
2768 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2769 write_request ? "write" : "read",
2770 length, offset, result);
2772 __blk_end_request_all(rq, result);
2778 * a queue callback. Makes sure that we don't create a bio that spans across
2779 * multiple osd objects. One exception would be with a single page bios,
2780 * which we handle later at bio_chain_clone_range()
2782 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2783 struct bio_vec *bvec)
2785 struct rbd_device *rbd_dev = q->queuedata;
2786 sector_t sector_offset;
2787 sector_t sectors_per_obj;
2788 sector_t obj_sector_offset;
2792 * Find how far into its rbd object the partition-relative
2793 * bio start sector is to offset relative to the enclosing
2796 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2797 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2798 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2801 * Compute the number of bytes from that offset to the end
2802 * of the object. Account for what's already used by the bio.
2804 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2805 if (ret > bmd->bi_size)
2806 ret -= bmd->bi_size;
2811 * Don't send back more than was asked for. And if the bio
2812 * was empty, let the whole thing through because: "Note
2813 * that a block device *must* allow a single page to be
2814 * added to an empty bio."
2816 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2817 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2818 ret = (int) bvec->bv_len;
2823 static void rbd_free_disk(struct rbd_device *rbd_dev)
2825 struct gendisk *disk = rbd_dev->disk;
2830 rbd_dev->disk = NULL;
2831 if (disk->flags & GENHD_FL_UP) {
2834 blk_cleanup_queue(disk->queue);
2839 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2840 const char *object_name,
2841 u64 offset, u64 length,
2842 void *buf, u64 *version)
2845 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2846 struct rbd_obj_request *obj_request;
2847 struct page **pages = NULL;
2852 page_count = (u32) calc_pages_for(offset, length);
2853 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2855 ret = PTR_ERR(pages);
2858 obj_request = rbd_obj_request_create(object_name, offset, length,
2863 obj_request->pages = pages;
2864 obj_request->page_count = page_count;
2866 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2867 if (!obj_request->osd_req)
2870 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2871 offset, length, 0, 0);
2872 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2874 obj_request->length,
2875 obj_request->offset & ~PAGE_MASK,
2877 rbd_osd_req_format_read(obj_request);
2879 ret = rbd_obj_request_submit(osdc, obj_request);
2882 ret = rbd_obj_request_wait(obj_request);
2886 ret = obj_request->result;
2890 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2891 size = (size_t) obj_request->xferred;
2892 ceph_copy_from_page_vector(pages, buf, 0, size);
2893 rbd_assert(size <= (size_t) INT_MAX);
2896 *version = obj_request->version;
2899 rbd_obj_request_put(obj_request);
2901 ceph_release_page_vector(pages, page_count);
2907 * Read the complete header for the given rbd device.
2909 * Returns a pointer to a dynamically-allocated buffer containing
2910 * the complete and validated header. Caller can pass the address
2911 * of a variable that will be filled in with the version of the
2912 * header object at the time it was read.
2914 * Returns a pointer-coded errno if a failure occurs.
2916 static struct rbd_image_header_ondisk *
2917 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2919 struct rbd_image_header_ondisk *ondisk = NULL;
2926 * The complete header will include an array of its 64-bit
2927 * snapshot ids, followed by the names of those snapshots as
2928 * a contiguous block of NUL-terminated strings. Note that
2929 * the number of snapshots could change by the time we read
2930 * it in, in which case we re-read it.
2937 size = sizeof (*ondisk);
2938 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2940 ondisk = kmalloc(size, GFP_KERNEL);
2942 return ERR_PTR(-ENOMEM);
2944 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2945 0, size, ondisk, version);
2948 if ((size_t)ret < size) {
2950 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2954 if (!rbd_dev_ondisk_valid(ondisk)) {
2956 rbd_warn(rbd_dev, "invalid header");
2960 names_size = le64_to_cpu(ondisk->snap_names_len);
2961 want_count = snap_count;
2962 snap_count = le32_to_cpu(ondisk->snap_count);
2963 } while (snap_count != want_count);
2970 return ERR_PTR(ret);
2974 * reload the ondisk the header
2976 static int rbd_read_header(struct rbd_device *rbd_dev,
2977 struct rbd_image_header *header)
2979 struct rbd_image_header_ondisk *ondisk;
2983 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2985 return PTR_ERR(ondisk);
2986 ret = rbd_header_from_disk(header, ondisk);
2992 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2994 struct rbd_snap *snap;
2995 struct rbd_snap *next;
2997 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
2998 list_del(&snap->node);
2999 rbd_snap_destroy(snap);
3003 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3005 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3008 if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3011 rbd_dev->mapping.size = rbd_dev->header.image_size;
3012 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3013 dout("setting size to %llu sectors", (unsigned long long)size);
3014 set_capacity(rbd_dev->disk, size);
3019 * only read the first part of the ondisk header, without the snaps info
3021 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3024 struct rbd_image_header h;
3026 ret = rbd_read_header(rbd_dev, &h);
3030 down_write(&rbd_dev->header_rwsem);
3032 /* Update image size, and check for resize of mapped image */
3033 rbd_dev->header.image_size = h.image_size;
3034 rbd_update_mapping_size(rbd_dev);
3036 /* rbd_dev->header.object_prefix shouldn't change */
3037 kfree(rbd_dev->header.snap_sizes);
3038 kfree(rbd_dev->header.snap_names);
3039 /* osd requests may still refer to snapc */
3040 ceph_put_snap_context(rbd_dev->header.snapc);
3042 rbd_dev->header.image_size = h.image_size;
3043 rbd_dev->header.snapc = h.snapc;
3044 rbd_dev->header.snap_names = h.snap_names;
3045 rbd_dev->header.snap_sizes = h.snap_sizes;
3046 /* Free the extra copy of the object prefix */
3047 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3048 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3049 kfree(h.object_prefix);
3051 ret = rbd_dev_snaps_update(rbd_dev);
3053 up_write(&rbd_dev->header_rwsem);
3058 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3063 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3064 image_size = rbd_dev->header.image_size;
3065 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3066 if (rbd_dev->image_format == 1)
3067 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3069 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3070 mutex_unlock(&ctl_mutex);
3072 rbd_warn(rbd_dev, "got notification but failed to "
3073 " update snaps: %d\n", ret);
3074 if (image_size != rbd_dev->header.image_size)
3075 revalidate_disk(rbd_dev->disk);
3080 static int rbd_init_disk(struct rbd_device *rbd_dev)
3082 struct gendisk *disk;
3083 struct request_queue *q;
3086 /* create gendisk info */
3087 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3091 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3093 disk->major = rbd_dev->major;
3094 disk->first_minor = 0;
3095 disk->fops = &rbd_bd_ops;
3096 disk->private_data = rbd_dev;
3098 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3102 /* We use the default size, but let's be explicit about it. */
3103 blk_queue_physical_block_size(q, SECTOR_SIZE);
3105 /* set io sizes to object size */
3106 segment_size = rbd_obj_bytes(&rbd_dev->header);
3107 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3108 blk_queue_max_segment_size(q, segment_size);
3109 blk_queue_io_min(q, segment_size);
3110 blk_queue_io_opt(q, segment_size);
3112 blk_queue_merge_bvec(q, rbd_merge_bvec);
3115 q->queuedata = rbd_dev;
3117 rbd_dev->disk = disk;
3130 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3132 return container_of(dev, struct rbd_device, dev);
3135 static ssize_t rbd_size_show(struct device *dev,
3136 struct device_attribute *attr, char *buf)
3138 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3140 return sprintf(buf, "%llu\n",
3141 (unsigned long long)rbd_dev->mapping.size);
3145 * Note this shows the features for whatever's mapped, which is not
3146 * necessarily the base image.
3148 static ssize_t rbd_features_show(struct device *dev,
3149 struct device_attribute *attr, char *buf)
3151 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3153 return sprintf(buf, "0x%016llx\n",
3154 (unsigned long long)rbd_dev->mapping.features);
3157 static ssize_t rbd_major_show(struct device *dev,
3158 struct device_attribute *attr, char *buf)
3160 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3163 return sprintf(buf, "%d\n", rbd_dev->major);
3165 return sprintf(buf, "(none)\n");
3169 static ssize_t rbd_client_id_show(struct device *dev,
3170 struct device_attribute *attr, char *buf)
3172 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3174 return sprintf(buf, "client%lld\n",
3175 ceph_client_id(rbd_dev->rbd_client->client));
3178 static ssize_t rbd_pool_show(struct device *dev,
3179 struct device_attribute *attr, char *buf)
3181 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3183 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3186 static ssize_t rbd_pool_id_show(struct device *dev,
3187 struct device_attribute *attr, char *buf)
3189 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3191 return sprintf(buf, "%llu\n",
3192 (unsigned long long) rbd_dev->spec->pool_id);
3195 static ssize_t rbd_name_show(struct device *dev,
3196 struct device_attribute *attr, char *buf)
3198 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3200 if (rbd_dev->spec->image_name)
3201 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3203 return sprintf(buf, "(unknown)\n");
3206 static ssize_t rbd_image_id_show(struct device *dev,
3207 struct device_attribute *attr, char *buf)
3209 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3211 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3215 * Shows the name of the currently-mapped snapshot (or
3216 * RBD_SNAP_HEAD_NAME for the base image).
3218 static ssize_t rbd_snap_show(struct device *dev,
3219 struct device_attribute *attr,
3222 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3224 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3228 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3229 * for the parent image. If there is no parent, simply shows
3230 * "(no parent image)".
3232 static ssize_t rbd_parent_show(struct device *dev,
3233 struct device_attribute *attr,
3236 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3237 struct rbd_spec *spec = rbd_dev->parent_spec;
3242 return sprintf(buf, "(no parent image)\n");
3244 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3245 (unsigned long long) spec->pool_id, spec->pool_name);
3250 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3251 spec->image_name ? spec->image_name : "(unknown)");
3256 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3257 (unsigned long long) spec->snap_id, spec->snap_name);
3262 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3267 return (ssize_t) (bufp - buf);
3270 static ssize_t rbd_image_refresh(struct device *dev,
3271 struct device_attribute *attr,
3275 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3278 ret = rbd_dev_refresh(rbd_dev, NULL);
3280 return ret < 0 ? ret : size;
3283 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3284 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3285 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3286 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3287 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3288 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3289 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3290 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3291 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3292 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3293 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3295 static struct attribute *rbd_attrs[] = {
3296 &dev_attr_size.attr,
3297 &dev_attr_features.attr,
3298 &dev_attr_major.attr,
3299 &dev_attr_client_id.attr,
3300 &dev_attr_pool.attr,
3301 &dev_attr_pool_id.attr,
3302 &dev_attr_name.attr,
3303 &dev_attr_image_id.attr,
3304 &dev_attr_current_snap.attr,
3305 &dev_attr_parent.attr,
3306 &dev_attr_refresh.attr,
3310 static struct attribute_group rbd_attr_group = {
3314 static const struct attribute_group *rbd_attr_groups[] = {
3319 static void rbd_sysfs_dev_release(struct device *dev)
3323 static struct device_type rbd_device_type = {
3325 .groups = rbd_attr_groups,
3326 .release = rbd_sysfs_dev_release,
3329 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3331 kref_get(&spec->kref);
3336 static void rbd_spec_free(struct kref *kref);
3337 static void rbd_spec_put(struct rbd_spec *spec)
3340 kref_put(&spec->kref, rbd_spec_free);
3343 static struct rbd_spec *rbd_spec_alloc(void)
3345 struct rbd_spec *spec;
3347 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3350 kref_init(&spec->kref);
3355 static void rbd_spec_free(struct kref *kref)
3357 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3359 kfree(spec->pool_name);
3360 kfree(spec->image_id);
3361 kfree(spec->image_name);
3362 kfree(spec->snap_name);
3366 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3367 struct rbd_spec *spec)
3369 struct rbd_device *rbd_dev;
3371 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3375 spin_lock_init(&rbd_dev->lock);
3377 INIT_LIST_HEAD(&rbd_dev->node);
3378 INIT_LIST_HEAD(&rbd_dev->snaps);
3379 init_rwsem(&rbd_dev->header_rwsem);
3381 rbd_dev->spec = spec;
3382 rbd_dev->rbd_client = rbdc;
3384 /* Initialize the layout used for all rbd requests */
3386 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3387 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3388 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3389 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3394 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3396 rbd_put_client(rbd_dev->rbd_client);
3397 rbd_spec_put(rbd_dev->spec);
3401 static void rbd_snap_destroy(struct rbd_snap *snap)
3407 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3408 const char *snap_name,
3409 u64 snap_id, u64 snap_size,
3412 struct rbd_snap *snap;
3414 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3416 return ERR_PTR(-ENOMEM);
3418 snap->name = snap_name;
3420 snap->size = snap_size;
3421 snap->features = snap_features;
3427 * Returns a dynamically-allocated snapshot name if successful, or a
3428 * pointer-coded error otherwise.
3430 static const char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3431 u64 *snap_size, u64 *snap_features)
3433 const char *snap_name;
3436 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3438 /* Skip over names until we find the one we are looking for */
3440 snap_name = rbd_dev->header.snap_names;
3441 for (i = 0; i < which; i++)
3442 snap_name += strlen(snap_name) + 1;
3444 snap_name = kstrdup(snap_name, GFP_KERNEL);
3446 return ERR_PTR(-ENOMEM);
3448 *snap_size = rbd_dev->header.snap_sizes[which];
3449 *snap_features = 0; /* No features for v1 */
3455 * Get the size and object order for an image snapshot, or if
3456 * snap_id is CEPH_NOSNAP, gets this information for the base
3459 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3460 u8 *order, u64 *snap_size)
3462 __le64 snapid = cpu_to_le64(snap_id);
3467 } __attribute__ ((packed)) size_buf = { 0 };
3469 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3471 &snapid, sizeof (snapid),
3472 &size_buf, sizeof (size_buf), NULL);
3473 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3476 if (ret < sizeof (size_buf))
3480 *order = size_buf.order;
3481 *snap_size = le64_to_cpu(size_buf.size);
3483 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
3484 (unsigned long long)snap_id, (unsigned int)*order,
3485 (unsigned long long)*snap_size);
3490 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3492 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3493 &rbd_dev->header.obj_order,
3494 &rbd_dev->header.image_size);
3497 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3503 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3507 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3508 "rbd", "get_object_prefix", NULL, 0,
3509 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3510 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3515 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3516 p + ret, NULL, GFP_NOIO);
3519 if (IS_ERR(rbd_dev->header.object_prefix)) {
3520 ret = PTR_ERR(rbd_dev->header.object_prefix);
3521 rbd_dev->header.object_prefix = NULL;
3523 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3531 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3534 __le64 snapid = cpu_to_le64(snap_id);
3538 } __attribute__ ((packed)) features_buf = { 0 };
3542 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3543 "rbd", "get_features",
3544 &snapid, sizeof (snapid),
3545 &features_buf, sizeof (features_buf), NULL);
3546 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3549 if (ret < sizeof (features_buf))
3552 incompat = le64_to_cpu(features_buf.incompat);
3553 if (incompat & ~RBD_FEATURES_SUPPORTED)
3556 *snap_features = le64_to_cpu(features_buf.features);
3558 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3559 (unsigned long long)snap_id,
3560 (unsigned long long)*snap_features,
3561 (unsigned long long)le64_to_cpu(features_buf.incompat));
3566 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3568 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3569 &rbd_dev->header.features);
3572 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3574 struct rbd_spec *parent_spec;
3576 void *reply_buf = NULL;
3584 parent_spec = rbd_spec_alloc();
3588 size = sizeof (__le64) + /* pool_id */
3589 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3590 sizeof (__le64) + /* snap_id */
3591 sizeof (__le64); /* overlap */
3592 reply_buf = kmalloc(size, GFP_KERNEL);
3598 snapid = cpu_to_le64(CEPH_NOSNAP);
3599 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3600 "rbd", "get_parent",
3601 &snapid, sizeof (snapid),
3602 reply_buf, size, NULL);
3603 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3608 end = reply_buf + ret;
3610 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3611 if (parent_spec->pool_id == CEPH_NOPOOL)
3612 goto out; /* No parent? No problem. */
3614 /* The ceph file layout needs to fit pool id in 32 bits */
3617 if (parent_spec->pool_id > (u64)U32_MAX) {
3618 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3619 (unsigned long long)parent_spec->pool_id, U32_MAX);
3623 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3624 if (IS_ERR(image_id)) {
3625 ret = PTR_ERR(image_id);
3628 parent_spec->image_id = image_id;
3629 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3630 ceph_decode_64_safe(&p, end, overlap, out_err);
3632 rbd_dev->parent_overlap = overlap;
3633 rbd_dev->parent_spec = parent_spec;
3634 parent_spec = NULL; /* rbd_dev now owns this */
3639 rbd_spec_put(parent_spec);
3644 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3648 __le64 stripe_count;
3649 } __attribute__ ((packed)) striping_info_buf = { 0 };
3650 size_t size = sizeof (striping_info_buf);
3657 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3658 "rbd", "get_stripe_unit_count", NULL, 0,
3659 (char *)&striping_info_buf, size, NULL);
3660 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3667 * We don't actually support the "fancy striping" feature
3668 * (STRIPINGV2) yet, but if the striping sizes are the
3669 * defaults the behavior is the same as before. So find
3670 * out, and only fail if the image has non-default values.
3673 obj_size = (u64)1 << rbd_dev->header.obj_order;
3674 p = &striping_info_buf;
3675 stripe_unit = ceph_decode_64(&p);
3676 if (stripe_unit != obj_size) {
3677 rbd_warn(rbd_dev, "unsupported stripe unit "
3678 "(got %llu want %llu)",
3679 stripe_unit, obj_size);
3682 stripe_count = ceph_decode_64(&p);
3683 if (stripe_count != 1) {
3684 rbd_warn(rbd_dev, "unsupported stripe count "
3685 "(got %llu want 1)", stripe_count);
3688 rbd_dev->header.stripe_unit = stripe_unit;
3689 rbd_dev->header.stripe_count = stripe_count;
3694 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3696 size_t image_id_size;
3701 void *reply_buf = NULL;
3703 char *image_name = NULL;
3706 rbd_assert(!rbd_dev->spec->image_name);
3708 len = strlen(rbd_dev->spec->image_id);
3709 image_id_size = sizeof (__le32) + len;
3710 image_id = kmalloc(image_id_size, GFP_KERNEL);
3715 end = image_id + image_id_size;
3716 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3718 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3719 reply_buf = kmalloc(size, GFP_KERNEL);
3723 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3724 "rbd", "dir_get_name",
3725 image_id, image_id_size,
3726 reply_buf, size, NULL);
3730 end = reply_buf + ret;
3732 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3733 if (IS_ERR(image_name))
3736 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3745 * When an rbd image has a parent image, it is identified by the
3746 * pool, image, and snapshot ids (not names). This function fills
3747 * in the names for those ids. (It's OK if we can't figure out the
3748 * name for an image id, but the pool and snapshot ids should always
3749 * exist and have names.) All names in an rbd spec are dynamically
3752 * When an image being mapped (not a parent) is probed, we have the
3753 * pool name and pool id, image name and image id, and the snapshot
3754 * name. The only thing we're missing is the snapshot id.
3756 * The set of snapshots for an image is not known until they have
3757 * been read by rbd_dev_snaps_update(), so we can't completely fill
3758 * in this information until after that has been called.
3760 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3762 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3763 struct rbd_spec *spec = rbd_dev->spec;
3764 const char *pool_name;
3765 const char *image_name;
3766 const char *snap_name;
3770 * An image being mapped will have the pool name (etc.), but
3771 * we need to look up the snapshot id.
3773 if (spec->pool_name) {
3774 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3775 struct rbd_snap *snap;
3777 snap = snap_by_name(rbd_dev, spec->snap_name);
3780 spec->snap_id = snap->id;
3782 spec->snap_id = CEPH_NOSNAP;
3788 /* Get the pool name; we have to make our own copy of this */
3790 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3792 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3795 pool_name = kstrdup(pool_name, GFP_KERNEL);
3799 /* Fetch the image name; tolerate failure here */
3801 image_name = rbd_dev_image_name(rbd_dev);
3803 rbd_warn(rbd_dev, "unable to get image name");
3805 /* Look up the snapshot name, and make a copy */
3807 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3809 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3813 snap_name = kstrdup(snap_name, GFP_KERNEL);
3819 spec->pool_name = pool_name;
3820 spec->image_name = image_name;
3821 spec->snap_name = snap_name;
3831 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3840 struct ceph_snap_context *snapc;
3844 * We'll need room for the seq value (maximum snapshot id),
3845 * snapshot count, and array of that many snapshot ids.
3846 * For now we have a fixed upper limit on the number we're
3847 * prepared to receive.
3849 size = sizeof (__le64) + sizeof (__le32) +
3850 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3851 reply_buf = kzalloc(size, GFP_KERNEL);
3855 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3856 "rbd", "get_snapcontext", NULL, 0,
3857 reply_buf, size, ver);
3858 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3863 end = reply_buf + ret;
3865 ceph_decode_64_safe(&p, end, seq, out);
3866 ceph_decode_32_safe(&p, end, snap_count, out);
3869 * Make sure the reported number of snapshot ids wouldn't go
3870 * beyond the end of our buffer. But before checking that,
3871 * make sure the computed size of the snapshot context we
3872 * allocate is representable in a size_t.
3874 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3879 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3883 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3889 for (i = 0; i < snap_count; i++)
3890 snapc->snaps[i] = ceph_decode_64(&p);
3892 rbd_dev->header.snapc = snapc;
3894 dout(" snap context seq = %llu, snap_count = %u\n",
3895 (unsigned long long)seq, (unsigned int)snap_count);
3902 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3912 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3913 reply_buf = kmalloc(size, GFP_KERNEL);
3915 return ERR_PTR(-ENOMEM);
3917 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3918 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3919 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3920 "rbd", "get_snapshot_name",
3921 &snap_id, sizeof (snap_id),
3922 reply_buf, size, NULL);
3923 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3925 snap_name = ERR_PTR(ret);
3930 end = reply_buf + ret;
3931 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3932 if (IS_ERR(snap_name))
3935 dout(" snap_id 0x%016llx snap_name = %s\n",
3936 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3943 static const char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3944 u64 *snap_size, u64 *snap_features)
3949 const char *snap_name;
3952 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3953 snap_id = rbd_dev->header.snapc->snaps[which];
3954 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3958 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3962 snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3963 if (!IS_ERR(snap_name)) {
3965 *snap_features = features;
3970 return ERR_PTR(ret);
3973 static const char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3974 u64 *snap_size, u64 *snap_features)
3976 if (rbd_dev->image_format == 1)
3977 return rbd_dev_v1_snap_info(rbd_dev, which,
3978 snap_size, snap_features);
3979 if (rbd_dev->image_format == 2)
3980 return rbd_dev_v2_snap_info(rbd_dev, which,
3981 snap_size, snap_features);
3982 return ERR_PTR(-EINVAL);
3985 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3989 down_write(&rbd_dev->header_rwsem);
3991 ret = rbd_dev_v2_image_size(rbd_dev);
3994 rbd_update_mapping_size(rbd_dev);
3996 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3997 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4000 ret = rbd_dev_snaps_update(rbd_dev);
4001 dout("rbd_dev_snaps_update returned %d\n", ret);
4005 up_write(&rbd_dev->header_rwsem);
4011 * Scan the rbd device's current snapshot list and compare it to the
4012 * newly-received snapshot context. Remove any existing snapshots
4013 * not present in the new snapshot context. Add a new snapshot for
4014 * any snaphots in the snapshot context not in the current list.
4015 * And verify there are no changes to snapshots we already know
4018 * Assumes the snapshots in the snapshot context are sorted by
4019 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
4020 * are also maintained in that order.)
4022 * Note that any error occurs while updating the snapshot list
4023 * aborts the update, and the entire list is cleared. The snapshot
4024 * list becomes inconsistent at that point anyway, so it might as
4027 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4029 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4030 const u32 snap_count = snapc->num_snaps;
4031 struct list_head *head = &rbd_dev->snaps;
4032 struct list_head *links = head->next;
4036 dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4037 while (index < snap_count || links != head) {
4039 struct rbd_snap *snap;
4040 const char *snap_name;
4042 u64 snap_features = 0;
4044 snap_id = index < snap_count ? snapc->snaps[index]
4046 snap = links != head ? list_entry(links, struct rbd_snap, node)
4048 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4050 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4051 struct list_head *next = links->next;
4054 * A previously-existing snapshot is not in
4055 * the new snap context.
4057 * If the now-missing snapshot is the one
4058 * the image represents, clear its existence
4059 * flag so we can avoid sending any more
4062 if (rbd_dev->spec->snap_id == snap->id)
4063 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4064 dout("removing %ssnap id %llu\n",
4065 rbd_dev->spec->snap_id == snap->id ?
4067 (unsigned long long)snap->id);
4069 list_del(&snap->node);
4070 rbd_snap_destroy(snap);
4072 /* Done with this list entry; advance */
4078 snap_name = rbd_dev_snap_info(rbd_dev, index,
4079 &snap_size, &snap_features);
4080 if (IS_ERR(snap_name)) {
4081 ret = PTR_ERR(snap_name);
4082 dout("failed to get snap info, error %d\n", ret);
4086 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4087 (unsigned long long)snap_id);
4088 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4089 struct rbd_snap *new_snap;
4091 /* We haven't seen this snapshot before */
4093 new_snap = rbd_snap_create(rbd_dev, snap_name,
4094 snap_id, snap_size, snap_features);
4095 if (IS_ERR(new_snap)) {
4096 ret = PTR_ERR(new_snap);
4097 dout(" failed to add dev, error %d\n", ret);
4101 /* New goes before existing, or at end of list */
4103 dout(" added dev%s\n", snap ? "" : " at end\n");
4105 list_add_tail(&new_snap->node, &snap->node);
4107 list_add_tail(&new_snap->node, head);
4109 /* Already have this one */
4111 dout(" already present\n");
4113 rbd_assert(snap->size == snap_size);
4114 rbd_assert(!strcmp(snap->name, snap_name));
4115 rbd_assert(snap->features == snap_features);
4117 /* Done with this list entry; advance */
4119 links = links->next;
4122 /* Advance to the next entry in the snapshot context */
4126 dout("%s: done\n", __func__);
4130 rbd_remove_all_snaps(rbd_dev);
4135 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4140 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4142 dev = &rbd_dev->dev;
4143 dev->bus = &rbd_bus_type;
4144 dev->type = &rbd_device_type;
4145 dev->parent = &rbd_root_dev;
4146 dev->release = rbd_dev_device_release;
4147 dev_set_name(dev, "%d", rbd_dev->dev_id);
4148 ret = device_register(dev);
4150 mutex_unlock(&ctl_mutex);
4155 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4157 device_unregister(&rbd_dev->dev);
4160 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4163 * Get a unique rbd identifier for the given new rbd_dev, and add
4164 * the rbd_dev to the global list. The minimum rbd id is 1.
4166 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4168 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4170 spin_lock(&rbd_dev_list_lock);
4171 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4172 spin_unlock(&rbd_dev_list_lock);
4173 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4174 (unsigned long long) rbd_dev->dev_id);
4178 * Remove an rbd_dev from the global list, and record that its
4179 * identifier is no longer in use.
4181 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4183 struct list_head *tmp;
4184 int rbd_id = rbd_dev->dev_id;
4187 rbd_assert(rbd_id > 0);
4189 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4190 (unsigned long long) rbd_dev->dev_id);
4191 spin_lock(&rbd_dev_list_lock);
4192 list_del_init(&rbd_dev->node);
4195 * If the id being "put" is not the current maximum, there
4196 * is nothing special we need to do.
4198 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4199 spin_unlock(&rbd_dev_list_lock);
4204 * We need to update the current maximum id. Search the
4205 * list to find out what it is. We're more likely to find
4206 * the maximum at the end, so search the list backward.
4209 list_for_each_prev(tmp, &rbd_dev_list) {
4210 struct rbd_device *rbd_dev;
4212 rbd_dev = list_entry(tmp, struct rbd_device, node);
4213 if (rbd_dev->dev_id > max_id)
4214 max_id = rbd_dev->dev_id;
4216 spin_unlock(&rbd_dev_list_lock);
4219 * The max id could have been updated by rbd_dev_id_get(), in
4220 * which case it now accurately reflects the new maximum.
4221 * Be careful not to overwrite the maximum value in that
4224 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4225 dout(" max dev id has been reset\n");
4229 * Skips over white space at *buf, and updates *buf to point to the
4230 * first found non-space character (if any). Returns the length of
4231 * the token (string of non-white space characters) found. Note
4232 * that *buf must be terminated with '\0'.
4234 static inline size_t next_token(const char **buf)
4237 * These are the characters that produce nonzero for
4238 * isspace() in the "C" and "POSIX" locales.
4240 const char *spaces = " \f\n\r\t\v";
4242 *buf += strspn(*buf, spaces); /* Find start of token */
4244 return strcspn(*buf, spaces); /* Return token length */
4248 * Finds the next token in *buf, and if the provided token buffer is
4249 * big enough, copies the found token into it. The result, if
4250 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4251 * must be terminated with '\0' on entry.
4253 * Returns the length of the token found (not including the '\0').
4254 * Return value will be 0 if no token is found, and it will be >=
4255 * token_size if the token would not fit.
4257 * The *buf pointer will be updated to point beyond the end of the
4258 * found token. Note that this occurs even if the token buffer is
4259 * too small to hold it.
4261 static inline size_t copy_token(const char **buf,
4267 len = next_token(buf);
4268 if (len < token_size) {
4269 memcpy(token, *buf, len);
4270 *(token + len) = '\0';
4278 * Finds the next token in *buf, dynamically allocates a buffer big
4279 * enough to hold a copy of it, and copies the token into the new
4280 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4281 * that a duplicate buffer is created even for a zero-length token.
4283 * Returns a pointer to the newly-allocated duplicate, or a null
4284 * pointer if memory for the duplicate was not available. If
4285 * the lenp argument is a non-null pointer, the length of the token
4286 * (not including the '\0') is returned in *lenp.
4288 * If successful, the *buf pointer will be updated to point beyond
4289 * the end of the found token.
4291 * Note: uses GFP_KERNEL for allocation.
4293 static inline char *dup_token(const char **buf, size_t *lenp)
4298 len = next_token(buf);
4299 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4302 *(dup + len) = '\0';
4312 * Parse the options provided for an "rbd add" (i.e., rbd image
4313 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4314 * and the data written is passed here via a NUL-terminated buffer.
4315 * Returns 0 if successful or an error code otherwise.
4317 * The information extracted from these options is recorded in
4318 * the other parameters which return dynamically-allocated
4321 * The address of a pointer that will refer to a ceph options
4322 * structure. Caller must release the returned pointer using
4323 * ceph_destroy_options() when it is no longer needed.
4325 * Address of an rbd options pointer. Fully initialized by
4326 * this function; caller must release with kfree().
4328 * Address of an rbd image specification pointer. Fully
4329 * initialized by this function based on parsed options.
4330 * Caller must release with rbd_spec_put().
4332 * The options passed take this form:
4333 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4336 * A comma-separated list of one or more monitor addresses.
4337 * A monitor address is an ip address, optionally followed
4338 * by a port number (separated by a colon).
4339 * I.e.: ip1[:port1][,ip2[:port2]...]
4341 * A comma-separated list of ceph and/or rbd options.
4343 * The name of the rados pool containing the rbd image.
4345 * The name of the image in that pool to map.
4347 * An optional snapshot id. If provided, the mapping will
4348 * present data from the image at the time that snapshot was
4349 * created. The image head is used if no snapshot id is
4350 * provided. Snapshot mappings are always read-only.
4352 static int rbd_add_parse_args(const char *buf,
4353 struct ceph_options **ceph_opts,
4354 struct rbd_options **opts,
4355 struct rbd_spec **rbd_spec)
4359 const char *mon_addrs;
4361 size_t mon_addrs_size;
4362 struct rbd_spec *spec = NULL;
4363 struct rbd_options *rbd_opts = NULL;
4364 struct ceph_options *copts;
4367 /* The first four tokens are required */
4369 len = next_token(&buf);
4371 rbd_warn(NULL, "no monitor address(es) provided");
4375 mon_addrs_size = len + 1;
4379 options = dup_token(&buf, NULL);
4383 rbd_warn(NULL, "no options provided");
4387 spec = rbd_spec_alloc();
4391 spec->pool_name = dup_token(&buf, NULL);
4392 if (!spec->pool_name)
4394 if (!*spec->pool_name) {
4395 rbd_warn(NULL, "no pool name provided");
4399 spec->image_name = dup_token(&buf, NULL);
4400 if (!spec->image_name)
4402 if (!*spec->image_name) {
4403 rbd_warn(NULL, "no image name provided");
4408 * Snapshot name is optional; default is to use "-"
4409 * (indicating the head/no snapshot).
4411 len = next_token(&buf);
4413 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4414 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4415 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4416 ret = -ENAMETOOLONG;
4419 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4422 *(snap_name + len) = '\0';
4423 spec->snap_name = snap_name;
4425 /* Initialize all rbd options to the defaults */
4427 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4431 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4433 copts = ceph_parse_options(options, mon_addrs,
4434 mon_addrs + mon_addrs_size - 1,
4435 parse_rbd_opts_token, rbd_opts);
4436 if (IS_ERR(copts)) {
4437 ret = PTR_ERR(copts);
4458 * An rbd format 2 image has a unique identifier, distinct from the
4459 * name given to it by the user. Internally, that identifier is
4460 * what's used to specify the names of objects related to the image.
4462 * A special "rbd id" object is used to map an rbd image name to its
4463 * id. If that object doesn't exist, then there is no v2 rbd image
4464 * with the supplied name.
4466 * This function will record the given rbd_dev's image_id field if
4467 * it can be determined, and in that case will return 0. If any
4468 * errors occur a negative errno will be returned and the rbd_dev's
4469 * image_id field will be unchanged (and should be NULL).
4471 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4480 * When probing a parent image, the image id is already
4481 * known (and the image name likely is not). There's no
4482 * need to fetch the image id again in this case. We
4483 * do still need to set the image format though.
4485 if (rbd_dev->spec->image_id) {
4486 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4492 * First, see if the format 2 image id file exists, and if
4493 * so, get the image's persistent id from it.
4495 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4496 object_name = kmalloc(size, GFP_NOIO);
4499 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4500 dout("rbd id object name is %s\n", object_name);
4502 /* Response will be an encoded string, which includes a length */
4504 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4505 response = kzalloc(size, GFP_NOIO);
4511 /* If it doesn't exist we'll assume it's a format 1 image */
4513 ret = rbd_obj_method_sync(rbd_dev, object_name,
4514 "rbd", "get_id", NULL, 0,
4515 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4516 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4517 if (ret == -ENOENT) {
4518 image_id = kstrdup("", GFP_KERNEL);
4519 ret = image_id ? 0 : -ENOMEM;
4521 rbd_dev->image_format = 1;
4522 } else if (ret > sizeof (__le32)) {
4525 image_id = ceph_extract_encoded_string(&p, p + ret,
4527 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4529 rbd_dev->image_format = 2;
4535 rbd_dev->spec->image_id = image_id;
4536 dout("image_id is %s\n", image_id);
4545 /* Undo whatever state changes are made by v1 or v2 image probe */
4547 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4549 struct rbd_image_header *header;
4551 rbd_dev_remove_parent(rbd_dev);
4552 rbd_spec_put(rbd_dev->parent_spec);
4553 rbd_dev->parent_spec = NULL;
4554 rbd_dev->parent_overlap = 0;
4556 /* Free dynamic fields from the header, then zero it out */
4558 header = &rbd_dev->header;
4559 ceph_put_snap_context(header->snapc);
4560 kfree(header->snap_sizes);
4561 kfree(header->snap_names);
4562 kfree(header->object_prefix);
4563 memset(header, 0, sizeof (*header));
4566 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4570 /* Populate rbd image metadata */
4572 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4576 /* Version 1 images have no parent (no layering) */
4578 rbd_dev->parent_spec = NULL;
4579 rbd_dev->parent_overlap = 0;
4581 dout("discovered version 1 image, header name is %s\n",
4582 rbd_dev->header_name);
4587 kfree(rbd_dev->header_name);
4588 rbd_dev->header_name = NULL;
4589 kfree(rbd_dev->spec->image_id);
4590 rbd_dev->spec->image_id = NULL;
4595 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4600 ret = rbd_dev_v2_image_size(rbd_dev);
4604 /* Get the object prefix (a.k.a. block_name) for the image */
4606 ret = rbd_dev_v2_object_prefix(rbd_dev);
4610 /* Get the and check features for the image */
4612 ret = rbd_dev_v2_features(rbd_dev);
4616 /* If the image supports layering, get the parent info */
4618 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4619 ret = rbd_dev_v2_parent_info(rbd_dev);
4624 * Don't print a warning for parent images. We can
4625 * tell this point because we won't know its pool
4626 * name yet (just its pool id).
4628 if (rbd_dev->spec->pool_name)
4629 rbd_warn(rbd_dev, "WARNING: kernel layering "
4630 "is EXPERIMENTAL!");
4633 /* If the image supports fancy striping, get its parameters */
4635 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4636 ret = rbd_dev_v2_striping_info(rbd_dev);
4641 /* crypto and compression type aren't (yet) supported for v2 images */
4643 rbd_dev->header.crypt_type = 0;
4644 rbd_dev->header.comp_type = 0;
4646 /* Get the snapshot context, plus the header version */
4648 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4652 dout("discovered version 2 image, header name is %s\n",
4653 rbd_dev->header_name);
4657 rbd_dev->parent_overlap = 0;
4658 rbd_spec_put(rbd_dev->parent_spec);
4659 rbd_dev->parent_spec = NULL;
4660 kfree(rbd_dev->header_name);
4661 rbd_dev->header_name = NULL;
4662 kfree(rbd_dev->header.object_prefix);
4663 rbd_dev->header.object_prefix = NULL;
4668 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4670 struct rbd_device *parent = NULL;
4671 struct rbd_spec *parent_spec;
4672 struct rbd_client *rbdc;
4675 if (!rbd_dev->parent_spec)
4678 * We need to pass a reference to the client and the parent
4679 * spec when creating the parent rbd_dev. Images related by
4680 * parent/child relationships always share both.
4682 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4683 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4686 parent = rbd_dev_create(rbdc, parent_spec);
4690 ret = rbd_dev_image_probe(parent);
4693 rbd_dev->parent = parent;
4698 rbd_spec_put(rbd_dev->parent_spec);
4699 kfree(rbd_dev->header_name);
4700 rbd_dev_destroy(parent);
4702 rbd_put_client(rbdc);
4703 rbd_spec_put(parent_spec);
4709 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4713 ret = rbd_dev_mapping_set(rbd_dev);
4717 /* generate unique id: find highest unique id, add one */
4718 rbd_dev_id_get(rbd_dev);
4720 /* Fill in the device name, now that we have its id. */
4721 BUILD_BUG_ON(DEV_NAME_LEN
4722 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4723 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4725 /* Get our block major device number. */
4727 ret = register_blkdev(0, rbd_dev->name);
4730 rbd_dev->major = ret;
4732 /* Set up the blkdev mapping. */
4734 ret = rbd_init_disk(rbd_dev);
4736 goto err_out_blkdev;
4738 ret = rbd_bus_add_dev(rbd_dev);
4742 /* Everything's ready. Announce the disk to the world. */
4744 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4745 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4746 add_disk(rbd_dev->disk);
4748 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4749 (unsigned long long) rbd_dev->mapping.size);
4754 rbd_free_disk(rbd_dev);
4756 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4758 rbd_dev_id_put(rbd_dev);
4759 rbd_dev_mapping_clear(rbd_dev);
4764 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4766 struct rbd_spec *spec = rbd_dev->spec;
4769 /* Record the header object name for this rbd image. */
4771 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4773 if (rbd_dev->image_format == 1)
4774 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4776 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4778 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4779 if (!rbd_dev->header_name)
4782 if (rbd_dev->image_format == 1)
4783 sprintf(rbd_dev->header_name, "%s%s",
4784 spec->image_name, RBD_SUFFIX);
4786 sprintf(rbd_dev->header_name, "%s%s",
4787 RBD_HEADER_PREFIX, spec->image_id);
4791 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4795 rbd_remove_all_snaps(rbd_dev);
4796 rbd_dev_unprobe(rbd_dev);
4797 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4799 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4800 kfree(rbd_dev->header_name);
4801 rbd_dev->header_name = NULL;
4802 rbd_dev->image_format = 0;
4803 kfree(rbd_dev->spec->image_id);
4804 rbd_dev->spec->image_id = NULL;
4806 rbd_dev_destroy(rbd_dev);
4810 * Probe for the existence of the header object for the given rbd
4811 * device. For format 2 images this includes determining the image
4814 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4820 * Get the id from the image id object. If it's not a
4821 * format 2 image, we'll get ENOENT back, and we'll assume
4822 * it's a format 1 image.
4824 ret = rbd_dev_image_id(rbd_dev);
4827 rbd_assert(rbd_dev->spec->image_id);
4828 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4830 ret = rbd_dev_header_name(rbd_dev);
4832 goto err_out_format;
4834 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4836 goto out_header_name;
4838 if (rbd_dev->image_format == 1)
4839 ret = rbd_dev_v1_probe(rbd_dev);
4841 ret = rbd_dev_v2_probe(rbd_dev);
4845 ret = rbd_dev_snaps_update(rbd_dev);
4849 ret = rbd_dev_spec_update(rbd_dev);
4853 ret = rbd_dev_probe_parent(rbd_dev);
4858 rbd_remove_all_snaps(rbd_dev);
4860 rbd_dev_unprobe(rbd_dev);
4862 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4864 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4866 kfree(rbd_dev->header_name);
4867 rbd_dev->header_name = NULL;
4869 rbd_dev->image_format = 0;
4870 kfree(rbd_dev->spec->image_id);
4871 rbd_dev->spec->image_id = NULL;
4873 dout("probe failed, returning %d\n", ret);
4878 static ssize_t rbd_add(struct bus_type *bus,
4882 struct rbd_device *rbd_dev = NULL;
4883 struct ceph_options *ceph_opts = NULL;
4884 struct rbd_options *rbd_opts = NULL;
4885 struct rbd_spec *spec = NULL;
4886 struct rbd_client *rbdc;
4887 struct ceph_osd_client *osdc;
4890 if (!try_module_get(THIS_MODULE))
4893 /* parse add command */
4894 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4896 goto err_out_module;
4898 rbdc = rbd_get_client(ceph_opts);
4903 ceph_opts = NULL; /* rbd_dev client now owns this */
4906 osdc = &rbdc->client->osdc;
4907 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4909 goto err_out_client;
4910 spec->pool_id = (u64)rc;
4912 /* The ceph file layout needs to fit pool id in 32 bits */
4914 if (spec->pool_id > (u64)U32_MAX) {
4915 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4916 (unsigned long long)spec->pool_id, U32_MAX);
4918 goto err_out_client;
4921 rbd_dev = rbd_dev_create(rbdc, spec);
4923 goto err_out_client;
4924 rbdc = NULL; /* rbd_dev now owns this */
4925 spec = NULL; /* rbd_dev now owns this */
4927 rbd_dev->mapping.read_only = rbd_opts->read_only;
4929 rbd_opts = NULL; /* done with this */
4931 rc = rbd_dev_image_probe(rbd_dev);
4933 goto err_out_rbd_dev;
4935 rc = rbd_dev_device_setup(rbd_dev);
4939 rbd_dev_image_release(rbd_dev);
4941 rbd_dev_destroy(rbd_dev);
4943 rbd_put_client(rbdc);
4946 ceph_destroy_options(ceph_opts);
4950 module_put(THIS_MODULE);
4952 dout("Error adding device %s\n", buf);
4957 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4959 struct list_head *tmp;
4960 struct rbd_device *rbd_dev;
4962 spin_lock(&rbd_dev_list_lock);
4963 list_for_each(tmp, &rbd_dev_list) {
4964 rbd_dev = list_entry(tmp, struct rbd_device, node);
4965 if (rbd_dev->dev_id == dev_id) {
4966 spin_unlock(&rbd_dev_list_lock);
4970 spin_unlock(&rbd_dev_list_lock);
4974 static void rbd_dev_device_release(struct device *dev)
4976 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4978 rbd_free_disk(rbd_dev);
4979 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4980 rbd_dev_clear_mapping(rbd_dev);
4981 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4983 rbd_dev_id_put(rbd_dev);
4984 rbd_dev_mapping_clear(rbd_dev);
4987 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4989 while (rbd_dev->parent) {
4990 struct rbd_device *first = rbd_dev;
4991 struct rbd_device *second = first->parent;
4992 struct rbd_device *third;
4995 * Follow to the parent with no grandparent and
4998 while (second && (third = second->parent)) {
5003 rbd_dev_image_release(second);
5004 first->parent = NULL;
5005 first->parent_overlap = 0;
5007 rbd_assert(first->parent_spec);
5008 rbd_spec_put(first->parent_spec);
5009 first->parent_spec = NULL;
5013 static ssize_t rbd_remove(struct bus_type *bus,
5017 struct rbd_device *rbd_dev = NULL;
5022 ret = strict_strtoul(buf, 10, &ul);
5026 /* convert to int; abort if we lost anything in the conversion */
5027 target_id = (int) ul;
5028 if (target_id != ul)
5031 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5033 rbd_dev = __rbd_get_dev(target_id);
5039 spin_lock_irq(&rbd_dev->lock);
5040 if (rbd_dev->open_count)
5043 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5044 spin_unlock_irq(&rbd_dev->lock);
5048 rbd_bus_del_dev(rbd_dev);
5049 rbd_dev_image_release(rbd_dev);
5050 module_put(THIS_MODULE);
5052 mutex_unlock(&ctl_mutex);
5058 * create control files in sysfs
5061 static int rbd_sysfs_init(void)
5065 ret = device_register(&rbd_root_dev);
5069 ret = bus_register(&rbd_bus_type);
5071 device_unregister(&rbd_root_dev);
5076 static void rbd_sysfs_cleanup(void)
5078 bus_unregister(&rbd_bus_type);
5079 device_unregister(&rbd_root_dev);
5082 static int __init rbd_init(void)
5086 if (!libceph_compatible(NULL)) {
5087 rbd_warn(NULL, "libceph incompatibility (quitting)");
5091 rc = rbd_sysfs_init();
5094 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5098 static void __exit rbd_exit(void)
5100 rbd_sysfs_cleanup();
5103 module_init(rbd_init);
5104 module_exit(rbd_exit);
5106 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5107 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5108 MODULE_DESCRIPTION("rados block device");
5110 /* following authorship retained from original osdblk.c */
5111 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5113 MODULE_LICENSE("GPL");