3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
45 #include "rbd_types.h"
47 #define RBD_DEBUG /* Activate rbd_assert() calls */
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
55 #define SECTOR_SHIFT 9
56 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
63 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN \
65 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69 #define RBD_SNAP_HEAD_NAME "-"
71 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX 64
77 #define RBD_OBJ_PREFIX_LEN_MAX 64
81 #define RBD_FEATURE_LAYERING (1<<0)
82 #define RBD_FEATURE_STRIPINGV2 (1<<1)
83 #define RBD_FEATURES_ALL \
84 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
86 /* Features supported by this (client software) implementation. */
88 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
91 * An RBD device name will be "rbd#", where the "rbd" comes from
92 * RBD_DRV_NAME above, and # is a unique integer identifier.
93 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94 * enough to hold all possible device names.
96 #define DEV_NAME_LEN 32
97 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
100 * block device image metadata (in-memory version)
102 struct rbd_image_header {
103 /* These six fields never change for a given rbd image */
110 u64 features; /* Might be changeable someday? */
112 /* The remaining fields need to be updated occasionally */
114 struct ceph_snap_context *snapc;
115 char *snap_names; /* format 1 only */
116 u64 *snap_sizes; /* format 1 only */
120 * An rbd image specification.
122 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123 * identify an image. Each rbd_dev structure includes a pointer to
124 * an rbd_spec structure that encapsulates this identity.
126 * Each of the id's in an rbd_spec has an associated name. For a
127 * user-mapped image, the names are supplied and the id's associated
128 * with them are looked up. For a layered image, a parent image is
129 * defined by the tuple, and the names are looked up.
131 * An rbd_dev structure contains a parent_spec pointer which is
132 * non-null if the image it represents is a child in a layered
133 * image. This pointer will refer to the rbd_spec structure used
134 * by the parent rbd_dev for its own identity (i.e., the structure
135 * is shared between the parent and child).
137 * Since these structures are populated once, during the discovery
138 * phase of image construction, they are effectively immutable so
139 * we make no effort to synchronize access to them.
141 * Note that code herein does not assume the image name is known (it
142 * could be a null pointer).
146 const char *pool_name;
148 const char *image_id;
149 const char *image_name;
152 const char *snap_name;
158 * an instance of the client. multiple devices may share an rbd client.
161 struct ceph_client *client;
163 struct list_head node;
166 struct rbd_img_request;
167 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
169 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
171 struct rbd_obj_request;
172 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
174 enum obj_request_type {
175 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
179 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
180 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
181 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
182 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
185 struct rbd_obj_request {
186 const char *object_name;
187 u64 offset; /* object start byte */
188 u64 length; /* bytes from offset */
192 * An object request associated with an image will have its
193 * img_data flag set; a standalone object request will not.
195 * A standalone object request will have which == BAD_WHICH
196 * and a null obj_request pointer.
198 * An object request initiated in support of a layered image
199 * object (to check for its existence before a write) will
200 * have which == BAD_WHICH and a non-null obj_request pointer.
202 * Finally, an object request for rbd image data will have
203 * which != BAD_WHICH, and will have a non-null img_request
204 * pointer. The value of which will be in the range
205 * 0..(img_request->obj_request_count-1).
208 struct rbd_obj_request *obj_request; /* STAT op */
210 struct rbd_img_request *img_request;
212 /* links for img_request->obj_requests list */
213 struct list_head links;
216 u32 which; /* posn image request list */
218 enum obj_request_type type;
220 struct bio *bio_list;
226 struct page **copyup_pages;
228 struct ceph_osd_request *osd_req;
230 u64 xferred; /* bytes transferred */
233 rbd_obj_callback_t callback;
234 struct completion completion;
240 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
241 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
242 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
245 struct rbd_img_request {
246 struct rbd_device *rbd_dev;
247 u64 offset; /* starting image byte offset */
248 u64 length; /* byte count from offset */
251 u64 snap_id; /* for reads */
252 struct ceph_snap_context *snapc; /* for writes */
255 struct request *rq; /* block request */
256 struct rbd_obj_request *obj_request; /* obj req initiator */
258 struct page **copyup_pages;
259 spinlock_t completion_lock;/* protects next_completion */
261 rbd_img_callback_t callback;
262 u64 xferred;/* aggregate bytes transferred */
263 int result; /* first nonzero obj_request result */
265 u32 obj_request_count;
266 struct list_head obj_requests; /* rbd_obj_request structs */
271 #define for_each_obj_request(ireq, oreq) \
272 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
273 #define for_each_obj_request_from(ireq, oreq) \
274 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
275 #define for_each_obj_request_safe(ireq, oreq, n) \
276 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
288 int dev_id; /* blkdev unique id */
290 int major; /* blkdev assigned major */
291 struct gendisk *disk; /* blkdev's gendisk and rq */
293 u32 image_format; /* Either 1 or 2 */
294 struct rbd_client *rbd_client;
296 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
298 spinlock_t lock; /* queue, flags, open_count */
300 struct rbd_image_header header;
301 unsigned long flags; /* possibly lock protected */
302 struct rbd_spec *spec;
306 struct ceph_file_layout layout;
308 struct ceph_osd_event *watch_event;
309 struct rbd_obj_request *watch_request;
311 struct rbd_spec *parent_spec;
313 struct rbd_device *parent;
315 /* protects updating the header */
316 struct rw_semaphore header_rwsem;
318 struct rbd_mapping mapping;
320 struct list_head node;
324 unsigned long open_count; /* protected by lock */
328 * Flag bits for rbd_dev->flags. If atomicity is required,
329 * rbd_dev->lock is used to protect access.
331 * Currently, only the "removing" flag (which is coupled with the
332 * "open_count" field) requires atomic access.
335 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
336 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
339 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
341 static LIST_HEAD(rbd_dev_list); /* devices */
342 static DEFINE_SPINLOCK(rbd_dev_list_lock);
344 static LIST_HEAD(rbd_client_list); /* clients */
345 static DEFINE_SPINLOCK(rbd_client_list_lock);
347 /* Slab caches for frequently-allocated structures */
349 static struct kmem_cache *rbd_img_request_cache;
350 static struct kmem_cache *rbd_obj_request_cache;
351 static struct kmem_cache *rbd_segment_name_cache;
353 static int rbd_img_request_submit(struct rbd_img_request *img_request);
355 static void rbd_dev_device_release(struct device *dev);
357 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
359 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
361 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
363 static struct bus_attribute rbd_bus_attrs[] = {
364 __ATTR(add, S_IWUSR, NULL, rbd_add),
365 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
369 static struct bus_type rbd_bus_type = {
371 .bus_attrs = rbd_bus_attrs,
374 static void rbd_root_dev_release(struct device *dev)
378 static struct device rbd_root_dev = {
380 .release = rbd_root_dev_release,
383 static __printf(2, 3)
384 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
386 struct va_format vaf;
394 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
395 else if (rbd_dev->disk)
396 printk(KERN_WARNING "%s: %s: %pV\n",
397 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
398 else if (rbd_dev->spec && rbd_dev->spec->image_name)
399 printk(KERN_WARNING "%s: image %s: %pV\n",
400 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
401 else if (rbd_dev->spec && rbd_dev->spec->image_id)
402 printk(KERN_WARNING "%s: id %s: %pV\n",
403 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
405 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
406 RBD_DRV_NAME, rbd_dev, &vaf);
411 #define rbd_assert(expr) \
412 if (unlikely(!(expr))) { \
413 printk(KERN_ERR "\nAssertion failure in %s() " \
415 "\trbd_assert(%s);\n\n", \
416 __func__, __LINE__, #expr); \
419 #else /* !RBD_DEBUG */
420 # define rbd_assert(expr) ((void) 0)
421 #endif /* !RBD_DEBUG */
423 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
424 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
425 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
427 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
428 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
429 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
430 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
432 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
433 u8 *order, u64 *snap_size);
434 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
436 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
438 static int rbd_open(struct block_device *bdev, fmode_t mode)
440 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
441 bool removing = false;
443 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
446 spin_lock_irq(&rbd_dev->lock);
447 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
450 rbd_dev->open_count++;
451 spin_unlock_irq(&rbd_dev->lock);
455 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
456 (void) get_device(&rbd_dev->dev);
457 set_device_ro(bdev, rbd_dev->mapping.read_only);
458 mutex_unlock(&ctl_mutex);
463 static int rbd_release(struct gendisk *disk, fmode_t mode)
465 struct rbd_device *rbd_dev = disk->private_data;
466 unsigned long open_count_before;
468 spin_lock_irq(&rbd_dev->lock);
469 open_count_before = rbd_dev->open_count--;
470 spin_unlock_irq(&rbd_dev->lock);
471 rbd_assert(open_count_before > 0);
473 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
474 put_device(&rbd_dev->dev);
475 mutex_unlock(&ctl_mutex);
480 static const struct block_device_operations rbd_bd_ops = {
481 .owner = THIS_MODULE,
483 .release = rbd_release,
487 * Initialize an rbd client instance.
490 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
492 struct rbd_client *rbdc;
495 dout("%s:\n", __func__);
496 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
500 kref_init(&rbdc->kref);
501 INIT_LIST_HEAD(&rbdc->node);
503 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
505 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
506 if (IS_ERR(rbdc->client))
508 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
510 ret = ceph_open_session(rbdc->client);
514 spin_lock(&rbd_client_list_lock);
515 list_add_tail(&rbdc->node, &rbd_client_list);
516 spin_unlock(&rbd_client_list_lock);
518 mutex_unlock(&ctl_mutex);
519 dout("%s: rbdc %p\n", __func__, rbdc);
524 ceph_destroy_client(rbdc->client);
526 mutex_unlock(&ctl_mutex);
530 ceph_destroy_options(ceph_opts);
531 dout("%s: error %d\n", __func__, ret);
536 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
538 kref_get(&rbdc->kref);
544 * Find a ceph client with specific addr and configuration. If
545 * found, bump its reference count.
547 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
549 struct rbd_client *client_node;
552 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
555 spin_lock(&rbd_client_list_lock);
556 list_for_each_entry(client_node, &rbd_client_list, node) {
557 if (!ceph_compare_options(ceph_opts, client_node->client)) {
558 __rbd_get_client(client_node);
564 spin_unlock(&rbd_client_list_lock);
566 return found ? client_node : NULL;
576 /* string args above */
579 /* Boolean args above */
583 static match_table_t rbd_opts_tokens = {
585 /* string args above */
586 {Opt_read_only, "read_only"},
587 {Opt_read_only, "ro"}, /* Alternate spelling */
588 {Opt_read_write, "read_write"},
589 {Opt_read_write, "rw"}, /* Alternate spelling */
590 /* Boolean args above */
598 #define RBD_READ_ONLY_DEFAULT false
600 static int parse_rbd_opts_token(char *c, void *private)
602 struct rbd_options *rbd_opts = private;
603 substring_t argstr[MAX_OPT_ARGS];
604 int token, intval, ret;
606 token = match_token(c, rbd_opts_tokens, argstr);
610 if (token < Opt_last_int) {
611 ret = match_int(&argstr[0], &intval);
613 pr_err("bad mount option arg (not int) "
617 dout("got int token %d val %d\n", token, intval);
618 } else if (token > Opt_last_int && token < Opt_last_string) {
619 dout("got string token %d val %s\n", token,
621 } else if (token > Opt_last_string && token < Opt_last_bool) {
622 dout("got Boolean token %d\n", token);
624 dout("got token %d\n", token);
629 rbd_opts->read_only = true;
632 rbd_opts->read_only = false;
642 * Get a ceph client with specific addr and configuration, if one does
643 * not exist create it.
645 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
647 struct rbd_client *rbdc;
649 rbdc = rbd_client_find(ceph_opts);
650 if (rbdc) /* using an existing client */
651 ceph_destroy_options(ceph_opts);
653 rbdc = rbd_client_create(ceph_opts);
659 * Destroy ceph client
661 * Caller must hold rbd_client_list_lock.
663 static void rbd_client_release(struct kref *kref)
665 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
667 dout("%s: rbdc %p\n", __func__, rbdc);
668 spin_lock(&rbd_client_list_lock);
669 list_del(&rbdc->node);
670 spin_unlock(&rbd_client_list_lock);
672 ceph_destroy_client(rbdc->client);
677 * Drop reference to ceph client node. If it's not referenced anymore, release
680 static void rbd_put_client(struct rbd_client *rbdc)
683 kref_put(&rbdc->kref, rbd_client_release);
686 static bool rbd_image_format_valid(u32 image_format)
688 return image_format == 1 || image_format == 2;
691 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
696 /* The header has to start with the magic rbd header text */
697 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
700 /* The bio layer requires at least sector-sized I/O */
702 if (ondisk->options.order < SECTOR_SHIFT)
705 /* If we use u64 in a few spots we may be able to loosen this */
707 if (ondisk->options.order > 8 * sizeof (int) - 1)
711 * The size of a snapshot header has to fit in a size_t, and
712 * that limits the number of snapshots.
714 snap_count = le32_to_cpu(ondisk->snap_count);
715 size = SIZE_MAX - sizeof (struct ceph_snap_context);
716 if (snap_count > size / sizeof (__le64))
720 * Not only that, but the size of the entire the snapshot
721 * header must also be representable in a size_t.
723 size -= snap_count * sizeof (__le64);
724 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
731 * Fill an rbd image header with information from the given format 1
734 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
735 struct rbd_image_header_ondisk *ondisk)
737 struct rbd_image_header *header = &rbd_dev->header;
738 bool first_time = header->object_prefix == NULL;
739 struct ceph_snap_context *snapc;
740 char *object_prefix = NULL;
741 char *snap_names = NULL;
742 u64 *snap_sizes = NULL;
748 /* Allocate this now to avoid having to handle failure below */
753 len = strnlen(ondisk->object_prefix,
754 sizeof (ondisk->object_prefix));
755 object_prefix = kmalloc(len + 1, GFP_KERNEL);
758 memcpy(object_prefix, ondisk->object_prefix, len);
759 object_prefix[len] = '\0';
762 /* Allocate the snapshot context and fill it in */
764 snap_count = le32_to_cpu(ondisk->snap_count);
765 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
768 snapc->seq = le64_to_cpu(ondisk->snap_seq);
770 struct rbd_image_snap_ondisk *snaps;
771 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
773 /* We'll keep a copy of the snapshot names... */
775 if (snap_names_len > (u64)SIZE_MAX)
777 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
781 /* ...as well as the array of their sizes. */
783 size = snap_count * sizeof (*header->snap_sizes);
784 snap_sizes = kmalloc(size, GFP_KERNEL);
789 * Copy the names, and fill in each snapshot's id
792 * Note that rbd_dev_v1_header_info() guarantees the
793 * ondisk buffer we're working with has
794 * snap_names_len bytes beyond the end of the
795 * snapshot id array, this memcpy() is safe.
797 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
798 snaps = ondisk->snaps;
799 for (i = 0; i < snap_count; i++) {
800 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
801 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
805 /* We won't fail any more, fill in the header */
807 down_write(&rbd_dev->header_rwsem);
809 header->object_prefix = object_prefix;
810 header->obj_order = ondisk->options.order;
811 header->crypt_type = ondisk->options.crypt_type;
812 header->comp_type = ondisk->options.comp_type;
813 /* The rest aren't used for format 1 images */
814 header->stripe_unit = 0;
815 header->stripe_count = 0;
816 header->features = 0;
818 ceph_put_snap_context(header->snapc);
819 kfree(header->snap_names);
820 kfree(header->snap_sizes);
823 /* The remaining fields always get updated (when we refresh) */
825 header->image_size = le64_to_cpu(ondisk->image_size);
826 header->snapc = snapc;
827 header->snap_names = snap_names;
828 header->snap_sizes = snap_sizes;
830 /* Make sure mapping size is consistent with header info */
832 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
833 if (rbd_dev->mapping.size != header->image_size)
834 rbd_dev->mapping.size = header->image_size;
836 up_write(&rbd_dev->header_rwsem);
844 ceph_put_snap_context(snapc);
845 kfree(object_prefix);
850 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
852 const char *snap_name;
854 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
856 /* Skip over names until we find the one we are looking for */
858 snap_name = rbd_dev->header.snap_names;
860 snap_name += strlen(snap_name) + 1;
862 return kstrdup(snap_name, GFP_KERNEL);
866 * Snapshot id comparison function for use with qsort()/bsearch().
867 * Note that result is for snapshots in *descending* order.
869 static int snapid_compare_reverse(const void *s1, const void *s2)
871 u64 snap_id1 = *(u64 *)s1;
872 u64 snap_id2 = *(u64 *)s2;
874 if (snap_id1 < snap_id2)
876 return snap_id1 == snap_id2 ? 0 : -1;
880 * Search a snapshot context to see if the given snapshot id is
883 * Returns the position of the snapshot id in the array if it's found,
884 * or BAD_SNAP_INDEX otherwise.
886 * Note: The snapshot array is in kept sorted (by the osd) in
887 * reverse order, highest snapshot id first.
889 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
891 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
894 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
895 sizeof (snap_id), snapid_compare_reverse);
897 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
900 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
905 which = rbd_dev_snap_index(rbd_dev, snap_id);
906 if (which == BAD_SNAP_INDEX)
909 return _rbd_dev_v1_snap_name(rbd_dev, which);
912 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
914 if (snap_id == CEPH_NOSNAP)
915 return RBD_SNAP_HEAD_NAME;
917 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
918 if (rbd_dev->image_format == 1)
919 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
921 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
924 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
927 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
928 if (snap_id == CEPH_NOSNAP) {
929 *snap_size = rbd_dev->header.image_size;
930 } else if (rbd_dev->image_format == 1) {
933 which = rbd_dev_snap_index(rbd_dev, snap_id);
934 if (which == BAD_SNAP_INDEX)
937 *snap_size = rbd_dev->header.snap_sizes[which];
942 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
951 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
954 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
955 if (snap_id == CEPH_NOSNAP) {
956 *snap_features = rbd_dev->header.features;
957 } else if (rbd_dev->image_format == 1) {
958 *snap_features = 0; /* No features for format 1 */
963 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
967 *snap_features = features;
972 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
974 u64 snap_id = rbd_dev->spec->snap_id;
979 ret = rbd_snap_size(rbd_dev, snap_id, &size);
982 ret = rbd_snap_features(rbd_dev, snap_id, &features);
986 rbd_dev->mapping.size = size;
987 rbd_dev->mapping.features = features;
992 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
994 rbd_dev->mapping.size = 0;
995 rbd_dev->mapping.features = 0;
998 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1004 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1007 segment = offset >> rbd_dev->header.obj_order;
1008 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
1009 rbd_dev->header.object_prefix, segment);
1010 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1011 pr_err("error formatting segment name for #%llu (%d)\n",
1020 static void rbd_segment_name_free(const char *name)
1022 /* The explicit cast here is needed to drop the const qualifier */
1024 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1027 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1029 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1031 return offset & (segment_size - 1);
1034 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1035 u64 offset, u64 length)
1037 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1039 offset &= segment_size - 1;
1041 rbd_assert(length <= U64_MAX - offset);
1042 if (offset + length > segment_size)
1043 length = segment_size - offset;
1049 * returns the size of an object in the image
1051 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1053 return 1 << header->obj_order;
1060 static void bio_chain_put(struct bio *chain)
1066 chain = chain->bi_next;
1072 * zeros a bio chain, starting at specific offset
1074 static void zero_bio_chain(struct bio *chain, int start_ofs)
1077 unsigned long flags;
1083 bio_for_each_segment(bv, chain, i) {
1084 if (pos + bv->bv_len > start_ofs) {
1085 int remainder = max(start_ofs - pos, 0);
1086 buf = bvec_kmap_irq(bv, &flags);
1087 memset(buf + remainder, 0,
1088 bv->bv_len - remainder);
1089 bvec_kunmap_irq(buf, &flags);
1094 chain = chain->bi_next;
1099 * similar to zero_bio_chain(), zeros data defined by a page array,
1100 * starting at the given byte offset from the start of the array and
1101 * continuing up to the given end offset. The pages array is
1102 * assumed to be big enough to hold all bytes up to the end.
1104 static void zero_pages(struct page **pages, u64 offset, u64 end)
1106 struct page **page = &pages[offset >> PAGE_SHIFT];
1108 rbd_assert(end > offset);
1109 rbd_assert(end - offset <= (u64)SIZE_MAX);
1110 while (offset < end) {
1113 unsigned long flags;
1116 page_offset = (size_t)(offset & ~PAGE_MASK);
1117 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1118 local_irq_save(flags);
1119 kaddr = kmap_atomic(*page);
1120 memset(kaddr + page_offset, 0, length);
1121 kunmap_atomic(kaddr);
1122 local_irq_restore(flags);
1130 * Clone a portion of a bio, starting at the given byte offset
1131 * and continuing for the number of bytes indicated.
1133 static struct bio *bio_clone_range(struct bio *bio_src,
1134 unsigned int offset,
1142 unsigned short end_idx;
1143 unsigned short vcnt;
1146 /* Handle the easy case for the caller */
1148 if (!offset && len == bio_src->bi_size)
1149 return bio_clone(bio_src, gfpmask);
1151 if (WARN_ON_ONCE(!len))
1153 if (WARN_ON_ONCE(len > bio_src->bi_size))
1155 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1158 /* Find first affected segment... */
1161 __bio_for_each_segment(bv, bio_src, idx, 0) {
1162 if (resid < bv->bv_len)
1164 resid -= bv->bv_len;
1168 /* ...and the last affected segment */
1171 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1172 if (resid <= bv->bv_len)
1174 resid -= bv->bv_len;
1176 vcnt = end_idx - idx + 1;
1178 /* Build the clone */
1180 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1182 return NULL; /* ENOMEM */
1184 bio->bi_bdev = bio_src->bi_bdev;
1185 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1186 bio->bi_rw = bio_src->bi_rw;
1187 bio->bi_flags |= 1 << BIO_CLONED;
1190 * Copy over our part of the bio_vec, then update the first
1191 * and last (or only) entries.
1193 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1194 vcnt * sizeof (struct bio_vec));
1195 bio->bi_io_vec[0].bv_offset += voff;
1197 bio->bi_io_vec[0].bv_len -= voff;
1198 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1200 bio->bi_io_vec[0].bv_len = len;
1203 bio->bi_vcnt = vcnt;
1211 * Clone a portion of a bio chain, starting at the given byte offset
1212 * into the first bio in the source chain and continuing for the
1213 * number of bytes indicated. The result is another bio chain of
1214 * exactly the given length, or a null pointer on error.
1216 * The bio_src and offset parameters are both in-out. On entry they
1217 * refer to the first source bio and the offset into that bio where
1218 * the start of data to be cloned is located.
1220 * On return, bio_src is updated to refer to the bio in the source
1221 * chain that contains first un-cloned byte, and *offset will
1222 * contain the offset of that byte within that bio.
1224 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1225 unsigned int *offset,
1229 struct bio *bi = *bio_src;
1230 unsigned int off = *offset;
1231 struct bio *chain = NULL;
1234 /* Build up a chain of clone bios up to the limit */
1236 if (!bi || off >= bi->bi_size || !len)
1237 return NULL; /* Nothing to clone */
1241 unsigned int bi_size;
1245 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1246 goto out_err; /* EINVAL; ran out of bio's */
1248 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1249 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1251 goto out_err; /* ENOMEM */
1254 end = &bio->bi_next;
1257 if (off == bi->bi_size) {
1268 bio_chain_put(chain);
1274 * The default/initial value for all object request flags is 0. For
1275 * each flag, once its value is set to 1 it is never reset to 0
1278 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1280 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1281 struct rbd_device *rbd_dev;
1283 rbd_dev = obj_request->img_request->rbd_dev;
1284 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1289 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1292 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1295 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1297 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1298 struct rbd_device *rbd_dev = NULL;
1300 if (obj_request_img_data_test(obj_request))
1301 rbd_dev = obj_request->img_request->rbd_dev;
1302 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1307 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1310 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1314 * This sets the KNOWN flag after (possibly) setting the EXISTS
1315 * flag. The latter is set based on the "exists" value provided.
1317 * Note that for our purposes once an object exists it never goes
1318 * away again. It's possible that the response from two existence
1319 * checks are separated by the creation of the target object, and
1320 * the first ("doesn't exist") response arrives *after* the second
1321 * ("does exist"). In that case we ignore the second one.
1323 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1327 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1328 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1332 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1335 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1338 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1341 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1344 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1346 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1347 atomic_read(&obj_request->kref.refcount));
1348 kref_get(&obj_request->kref);
1351 static void rbd_obj_request_destroy(struct kref *kref);
1352 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1354 rbd_assert(obj_request != NULL);
1355 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1356 atomic_read(&obj_request->kref.refcount));
1357 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1360 static void rbd_img_request_get(struct rbd_img_request *img_request)
1362 dout("%s: img %p (was %d)\n", __func__, img_request,
1363 atomic_read(&img_request->kref.refcount));
1364 kref_get(&img_request->kref);
1367 static void rbd_img_request_destroy(struct kref *kref);
1368 static void rbd_img_request_put(struct rbd_img_request *img_request)
1370 rbd_assert(img_request != NULL);
1371 dout("%s: img %p (was %d)\n", __func__, img_request,
1372 atomic_read(&img_request->kref.refcount));
1373 kref_put(&img_request->kref, rbd_img_request_destroy);
1376 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1377 struct rbd_obj_request *obj_request)
1379 rbd_assert(obj_request->img_request == NULL);
1381 /* Image request now owns object's original reference */
1382 obj_request->img_request = img_request;
1383 obj_request->which = img_request->obj_request_count;
1384 rbd_assert(!obj_request_img_data_test(obj_request));
1385 obj_request_img_data_set(obj_request);
1386 rbd_assert(obj_request->which != BAD_WHICH);
1387 img_request->obj_request_count++;
1388 list_add_tail(&obj_request->links, &img_request->obj_requests);
1389 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1390 obj_request->which);
1393 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1394 struct rbd_obj_request *obj_request)
1396 rbd_assert(obj_request->which != BAD_WHICH);
1398 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1399 obj_request->which);
1400 list_del(&obj_request->links);
1401 rbd_assert(img_request->obj_request_count > 0);
1402 img_request->obj_request_count--;
1403 rbd_assert(obj_request->which == img_request->obj_request_count);
1404 obj_request->which = BAD_WHICH;
1405 rbd_assert(obj_request_img_data_test(obj_request));
1406 rbd_assert(obj_request->img_request == img_request);
1407 obj_request->img_request = NULL;
1408 obj_request->callback = NULL;
1409 rbd_obj_request_put(obj_request);
1412 static bool obj_request_type_valid(enum obj_request_type type)
1415 case OBJ_REQUEST_NODATA:
1416 case OBJ_REQUEST_BIO:
1417 case OBJ_REQUEST_PAGES:
1424 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1425 struct rbd_obj_request *obj_request)
1427 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1429 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1432 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1435 dout("%s: img %p\n", __func__, img_request);
1438 * If no error occurred, compute the aggregate transfer
1439 * count for the image request. We could instead use
1440 * atomic64_cmpxchg() to update it as each object request
1441 * completes; not clear which way is better off hand.
1443 if (!img_request->result) {
1444 struct rbd_obj_request *obj_request;
1447 for_each_obj_request(img_request, obj_request)
1448 xferred += obj_request->xferred;
1449 img_request->xferred = xferred;
1452 if (img_request->callback)
1453 img_request->callback(img_request);
1455 rbd_img_request_put(img_request);
1458 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1460 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1462 dout("%s: obj %p\n", __func__, obj_request);
1464 return wait_for_completion_interruptible(&obj_request->completion);
1468 * The default/initial value for all image request flags is 0. Each
1469 * is conditionally set to 1 at image request initialization time
1470 * and currently never change thereafter.
1472 static void img_request_write_set(struct rbd_img_request *img_request)
1474 set_bit(IMG_REQ_WRITE, &img_request->flags);
1478 static bool img_request_write_test(struct rbd_img_request *img_request)
1481 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1484 static void img_request_child_set(struct rbd_img_request *img_request)
1486 set_bit(IMG_REQ_CHILD, &img_request->flags);
1490 static bool img_request_child_test(struct rbd_img_request *img_request)
1493 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1496 static void img_request_layered_set(struct rbd_img_request *img_request)
1498 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1502 static bool img_request_layered_test(struct rbd_img_request *img_request)
1505 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1509 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1511 u64 xferred = obj_request->xferred;
1512 u64 length = obj_request->length;
1514 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1515 obj_request, obj_request->img_request, obj_request->result,
1518 * ENOENT means a hole in the image. We zero-fill the
1519 * entire length of the request. A short read also implies
1520 * zero-fill to the end of the request. Either way we
1521 * update the xferred count to indicate the whole request
1524 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1525 if (obj_request->result == -ENOENT) {
1526 if (obj_request->type == OBJ_REQUEST_BIO)
1527 zero_bio_chain(obj_request->bio_list, 0);
1529 zero_pages(obj_request->pages, 0, length);
1530 obj_request->result = 0;
1531 obj_request->xferred = length;
1532 } else if (xferred < length && !obj_request->result) {
1533 if (obj_request->type == OBJ_REQUEST_BIO)
1534 zero_bio_chain(obj_request->bio_list, xferred);
1536 zero_pages(obj_request->pages, xferred, length);
1537 obj_request->xferred = length;
1539 obj_request_done_set(obj_request);
1542 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1544 dout("%s: obj %p cb %p\n", __func__, obj_request,
1545 obj_request->callback);
1546 if (obj_request->callback)
1547 obj_request->callback(obj_request);
1549 complete_all(&obj_request->completion);
1552 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1554 dout("%s: obj %p\n", __func__, obj_request);
1555 obj_request_done_set(obj_request);
1558 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1560 struct rbd_img_request *img_request = NULL;
1561 struct rbd_device *rbd_dev = NULL;
1562 bool layered = false;
1564 if (obj_request_img_data_test(obj_request)) {
1565 img_request = obj_request->img_request;
1566 layered = img_request && img_request_layered_test(img_request);
1567 rbd_dev = img_request->rbd_dev;
1570 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1571 obj_request, img_request, obj_request->result,
1572 obj_request->xferred, obj_request->length);
1573 if (layered && obj_request->result == -ENOENT &&
1574 obj_request->img_offset < rbd_dev->parent_overlap)
1575 rbd_img_parent_read(obj_request);
1576 else if (img_request)
1577 rbd_img_obj_request_read_callback(obj_request);
1579 obj_request_done_set(obj_request);
1582 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1584 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1585 obj_request->result, obj_request->length);
1587 * There is no such thing as a successful short write. Set
1588 * it to our originally-requested length.
1590 obj_request->xferred = obj_request->length;
1591 obj_request_done_set(obj_request);
1595 * For a simple stat call there's nothing to do. We'll do more if
1596 * this is part of a write sequence for a layered image.
1598 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1600 dout("%s: obj %p\n", __func__, obj_request);
1601 obj_request_done_set(obj_request);
1604 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1605 struct ceph_msg *msg)
1607 struct rbd_obj_request *obj_request = osd_req->r_priv;
1610 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1611 rbd_assert(osd_req == obj_request->osd_req);
1612 if (obj_request_img_data_test(obj_request)) {
1613 rbd_assert(obj_request->img_request);
1614 rbd_assert(obj_request->which != BAD_WHICH);
1616 rbd_assert(obj_request->which == BAD_WHICH);
1619 if (osd_req->r_result < 0)
1620 obj_request->result = osd_req->r_result;
1622 BUG_ON(osd_req->r_num_ops > 2);
1625 * We support a 64-bit length, but ultimately it has to be
1626 * passed to blk_end_request(), which takes an unsigned int.
1628 obj_request->xferred = osd_req->r_reply_op_len[0];
1629 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1630 opcode = osd_req->r_ops[0].op;
1632 case CEPH_OSD_OP_READ:
1633 rbd_osd_read_callback(obj_request);
1635 case CEPH_OSD_OP_WRITE:
1636 rbd_osd_write_callback(obj_request);
1638 case CEPH_OSD_OP_STAT:
1639 rbd_osd_stat_callback(obj_request);
1641 case CEPH_OSD_OP_CALL:
1642 case CEPH_OSD_OP_NOTIFY_ACK:
1643 case CEPH_OSD_OP_WATCH:
1644 rbd_osd_trivial_callback(obj_request);
1647 rbd_warn(NULL, "%s: unsupported op %hu\n",
1648 obj_request->object_name, (unsigned short) opcode);
1652 if (obj_request_done_test(obj_request))
1653 rbd_obj_request_complete(obj_request);
1656 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1658 struct rbd_img_request *img_request = obj_request->img_request;
1659 struct ceph_osd_request *osd_req = obj_request->osd_req;
1662 rbd_assert(osd_req != NULL);
1664 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1665 ceph_osdc_build_request(osd_req, obj_request->offset,
1666 NULL, snap_id, NULL);
1669 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1671 struct rbd_img_request *img_request = obj_request->img_request;
1672 struct ceph_osd_request *osd_req = obj_request->osd_req;
1673 struct ceph_snap_context *snapc;
1674 struct timespec mtime = CURRENT_TIME;
1676 rbd_assert(osd_req != NULL);
1678 snapc = img_request ? img_request->snapc : NULL;
1679 ceph_osdc_build_request(osd_req, obj_request->offset,
1680 snapc, CEPH_NOSNAP, &mtime);
1683 static struct ceph_osd_request *rbd_osd_req_create(
1684 struct rbd_device *rbd_dev,
1686 struct rbd_obj_request *obj_request)
1688 struct ceph_snap_context *snapc = NULL;
1689 struct ceph_osd_client *osdc;
1690 struct ceph_osd_request *osd_req;
1692 if (obj_request_img_data_test(obj_request)) {
1693 struct rbd_img_request *img_request = obj_request->img_request;
1695 rbd_assert(write_request ==
1696 img_request_write_test(img_request));
1698 snapc = img_request->snapc;
1701 /* Allocate and initialize the request, for the single op */
1703 osdc = &rbd_dev->rbd_client->client->osdc;
1704 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1706 return NULL; /* ENOMEM */
1709 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1711 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1713 osd_req->r_callback = rbd_osd_req_callback;
1714 osd_req->r_priv = obj_request;
1716 osd_req->r_oid_len = strlen(obj_request->object_name);
1717 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1718 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1720 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1726 * Create a copyup osd request based on the information in the
1727 * object request supplied. A copyup request has two osd ops,
1728 * a copyup method call, and a "normal" write request.
1730 static struct ceph_osd_request *
1731 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1733 struct rbd_img_request *img_request;
1734 struct ceph_snap_context *snapc;
1735 struct rbd_device *rbd_dev;
1736 struct ceph_osd_client *osdc;
1737 struct ceph_osd_request *osd_req;
1739 rbd_assert(obj_request_img_data_test(obj_request));
1740 img_request = obj_request->img_request;
1741 rbd_assert(img_request);
1742 rbd_assert(img_request_write_test(img_request));
1744 /* Allocate and initialize the request, for the two ops */
1746 snapc = img_request->snapc;
1747 rbd_dev = img_request->rbd_dev;
1748 osdc = &rbd_dev->rbd_client->client->osdc;
1749 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1751 return NULL; /* ENOMEM */
1753 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1754 osd_req->r_callback = rbd_osd_req_callback;
1755 osd_req->r_priv = obj_request;
1757 osd_req->r_oid_len = strlen(obj_request->object_name);
1758 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1759 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1761 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1767 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1769 ceph_osdc_put_request(osd_req);
1772 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1774 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1775 u64 offset, u64 length,
1776 enum obj_request_type type)
1778 struct rbd_obj_request *obj_request;
1782 rbd_assert(obj_request_type_valid(type));
1784 size = strlen(object_name) + 1;
1785 name = kmalloc(size, GFP_KERNEL);
1789 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1795 obj_request->object_name = memcpy(name, object_name, size);
1796 obj_request->offset = offset;
1797 obj_request->length = length;
1798 obj_request->flags = 0;
1799 obj_request->which = BAD_WHICH;
1800 obj_request->type = type;
1801 INIT_LIST_HEAD(&obj_request->links);
1802 init_completion(&obj_request->completion);
1803 kref_init(&obj_request->kref);
1805 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1806 offset, length, (int)type, obj_request);
1811 static void rbd_obj_request_destroy(struct kref *kref)
1813 struct rbd_obj_request *obj_request;
1815 obj_request = container_of(kref, struct rbd_obj_request, kref);
1817 dout("%s: obj %p\n", __func__, obj_request);
1819 rbd_assert(obj_request->img_request == NULL);
1820 rbd_assert(obj_request->which == BAD_WHICH);
1822 if (obj_request->osd_req)
1823 rbd_osd_req_destroy(obj_request->osd_req);
1825 rbd_assert(obj_request_type_valid(obj_request->type));
1826 switch (obj_request->type) {
1827 case OBJ_REQUEST_NODATA:
1828 break; /* Nothing to do */
1829 case OBJ_REQUEST_BIO:
1830 if (obj_request->bio_list)
1831 bio_chain_put(obj_request->bio_list);
1833 case OBJ_REQUEST_PAGES:
1834 if (obj_request->pages)
1835 ceph_release_page_vector(obj_request->pages,
1836 obj_request->page_count);
1840 kfree(obj_request->object_name);
1841 obj_request->object_name = NULL;
1842 kmem_cache_free(rbd_obj_request_cache, obj_request);
1846 * Caller is responsible for filling in the list of object requests
1847 * that comprises the image request, and the Linux request pointer
1848 * (if there is one).
1850 static struct rbd_img_request *rbd_img_request_create(
1851 struct rbd_device *rbd_dev,
1852 u64 offset, u64 length,
1856 struct rbd_img_request *img_request;
1858 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1862 if (write_request) {
1863 down_read(&rbd_dev->header_rwsem);
1864 ceph_get_snap_context(rbd_dev->header.snapc);
1865 up_read(&rbd_dev->header_rwsem);
1868 img_request->rq = NULL;
1869 img_request->rbd_dev = rbd_dev;
1870 img_request->offset = offset;
1871 img_request->length = length;
1872 img_request->flags = 0;
1873 if (write_request) {
1874 img_request_write_set(img_request);
1875 img_request->snapc = rbd_dev->header.snapc;
1877 img_request->snap_id = rbd_dev->spec->snap_id;
1880 img_request_child_set(img_request);
1881 if (rbd_dev->parent_spec)
1882 img_request_layered_set(img_request);
1883 spin_lock_init(&img_request->completion_lock);
1884 img_request->next_completion = 0;
1885 img_request->callback = NULL;
1886 img_request->result = 0;
1887 img_request->obj_request_count = 0;
1888 INIT_LIST_HEAD(&img_request->obj_requests);
1889 kref_init(&img_request->kref);
1891 rbd_img_request_get(img_request); /* Avoid a warning */
1892 rbd_img_request_put(img_request); /* TEMPORARY */
1894 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1895 write_request ? "write" : "read", offset, length,
1901 static void rbd_img_request_destroy(struct kref *kref)
1903 struct rbd_img_request *img_request;
1904 struct rbd_obj_request *obj_request;
1905 struct rbd_obj_request *next_obj_request;
1907 img_request = container_of(kref, struct rbd_img_request, kref);
1909 dout("%s: img %p\n", __func__, img_request);
1911 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1912 rbd_img_obj_request_del(img_request, obj_request);
1913 rbd_assert(img_request->obj_request_count == 0);
1915 if (img_request_write_test(img_request))
1916 ceph_put_snap_context(img_request->snapc);
1918 if (img_request_child_test(img_request))
1919 rbd_obj_request_put(img_request->obj_request);
1921 kmem_cache_free(rbd_img_request_cache, img_request);
1924 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1926 struct rbd_img_request *img_request;
1927 unsigned int xferred;
1931 rbd_assert(obj_request_img_data_test(obj_request));
1932 img_request = obj_request->img_request;
1934 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1935 xferred = (unsigned int)obj_request->xferred;
1936 result = obj_request->result;
1938 struct rbd_device *rbd_dev = img_request->rbd_dev;
1940 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1941 img_request_write_test(img_request) ? "write" : "read",
1942 obj_request->length, obj_request->img_offset,
1943 obj_request->offset);
1944 rbd_warn(rbd_dev, " result %d xferred %x\n",
1946 if (!img_request->result)
1947 img_request->result = result;
1950 /* Image object requests don't own their page array */
1952 if (obj_request->type == OBJ_REQUEST_PAGES) {
1953 obj_request->pages = NULL;
1954 obj_request->page_count = 0;
1957 if (img_request_child_test(img_request)) {
1958 rbd_assert(img_request->obj_request != NULL);
1959 more = obj_request->which < img_request->obj_request_count - 1;
1961 rbd_assert(img_request->rq != NULL);
1962 more = blk_end_request(img_request->rq, result, xferred);
1968 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1970 struct rbd_img_request *img_request;
1971 u32 which = obj_request->which;
1974 rbd_assert(obj_request_img_data_test(obj_request));
1975 img_request = obj_request->img_request;
1977 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1978 rbd_assert(img_request != NULL);
1979 rbd_assert(img_request->obj_request_count > 0);
1980 rbd_assert(which != BAD_WHICH);
1981 rbd_assert(which < img_request->obj_request_count);
1982 rbd_assert(which >= img_request->next_completion);
1984 spin_lock_irq(&img_request->completion_lock);
1985 if (which != img_request->next_completion)
1988 for_each_obj_request_from(img_request, obj_request) {
1990 rbd_assert(which < img_request->obj_request_count);
1992 if (!obj_request_done_test(obj_request))
1994 more = rbd_img_obj_end_request(obj_request);
1998 rbd_assert(more ^ (which == img_request->obj_request_count));
1999 img_request->next_completion = which;
2001 spin_unlock_irq(&img_request->completion_lock);
2004 rbd_img_request_complete(img_request);
2008 * Split up an image request into one or more object requests, each
2009 * to a different object. The "type" parameter indicates whether
2010 * "data_desc" is the pointer to the head of a list of bio
2011 * structures, or the base of a page array. In either case this
2012 * function assumes data_desc describes memory sufficient to hold
2013 * all data described by the image request.
2015 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2016 enum obj_request_type type,
2019 struct rbd_device *rbd_dev = img_request->rbd_dev;
2020 struct rbd_obj_request *obj_request = NULL;
2021 struct rbd_obj_request *next_obj_request;
2022 bool write_request = img_request_write_test(img_request);
2023 struct bio *bio_list;
2024 unsigned int bio_offset = 0;
2025 struct page **pages;
2030 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2031 (int)type, data_desc);
2033 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2034 img_offset = img_request->offset;
2035 resid = img_request->length;
2036 rbd_assert(resid > 0);
2038 if (type == OBJ_REQUEST_BIO) {
2039 bio_list = data_desc;
2040 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2042 rbd_assert(type == OBJ_REQUEST_PAGES);
2047 struct ceph_osd_request *osd_req;
2048 const char *object_name;
2052 object_name = rbd_segment_name(rbd_dev, img_offset);
2055 offset = rbd_segment_offset(rbd_dev, img_offset);
2056 length = rbd_segment_length(rbd_dev, img_offset, resid);
2057 obj_request = rbd_obj_request_create(object_name,
2058 offset, length, type);
2059 /* object request has its own copy of the object name */
2060 rbd_segment_name_free(object_name);
2064 if (type == OBJ_REQUEST_BIO) {
2065 unsigned int clone_size;
2067 rbd_assert(length <= (u64)UINT_MAX);
2068 clone_size = (unsigned int)length;
2069 obj_request->bio_list =
2070 bio_chain_clone_range(&bio_list,
2074 if (!obj_request->bio_list)
2077 unsigned int page_count;
2079 obj_request->pages = pages;
2080 page_count = (u32)calc_pages_for(offset, length);
2081 obj_request->page_count = page_count;
2082 if ((offset + length) & ~PAGE_MASK)
2083 page_count--; /* more on last page */
2084 pages += page_count;
2087 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2091 obj_request->osd_req = osd_req;
2092 obj_request->callback = rbd_img_obj_callback;
2094 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2096 if (type == OBJ_REQUEST_BIO)
2097 osd_req_op_extent_osd_data_bio(osd_req, 0,
2098 obj_request->bio_list, length);
2100 osd_req_op_extent_osd_data_pages(osd_req, 0,
2101 obj_request->pages, length,
2102 offset & ~PAGE_MASK, false, false);
2105 rbd_osd_req_format_write(obj_request);
2107 rbd_osd_req_format_read(obj_request);
2109 obj_request->img_offset = img_offset;
2110 rbd_img_obj_request_add(img_request, obj_request);
2112 img_offset += length;
2119 rbd_obj_request_put(obj_request);
2121 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2122 rbd_obj_request_put(obj_request);
2128 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2130 struct rbd_img_request *img_request;
2131 struct rbd_device *rbd_dev;
2135 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2136 rbd_assert(obj_request_img_data_test(obj_request));
2137 img_request = obj_request->img_request;
2138 rbd_assert(img_request);
2140 rbd_dev = img_request->rbd_dev;
2141 rbd_assert(rbd_dev);
2142 length = (u64)1 << rbd_dev->header.obj_order;
2143 page_count = (u32)calc_pages_for(0, length);
2145 rbd_assert(obj_request->copyup_pages);
2146 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2147 obj_request->copyup_pages = NULL;
2150 * We want the transfer count to reflect the size of the
2151 * original write request. There is no such thing as a
2152 * successful short write, so if the request was successful
2153 * we can just set it to the originally-requested length.
2155 if (!obj_request->result)
2156 obj_request->xferred = obj_request->length;
2158 /* Finish up with the normal image object callback */
2160 rbd_img_obj_callback(obj_request);
2164 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2166 struct rbd_obj_request *orig_request;
2167 struct ceph_osd_request *osd_req;
2168 struct ceph_osd_client *osdc;
2169 struct rbd_device *rbd_dev;
2170 struct page **pages;
2175 rbd_assert(img_request_child_test(img_request));
2177 /* First get what we need from the image request */
2179 pages = img_request->copyup_pages;
2180 rbd_assert(pages != NULL);
2181 img_request->copyup_pages = NULL;
2183 orig_request = img_request->obj_request;
2184 rbd_assert(orig_request != NULL);
2185 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2186 result = img_request->result;
2187 obj_size = img_request->length;
2188 xferred = img_request->xferred;
2189 rbd_img_request_put(img_request);
2191 rbd_assert(orig_request->img_request);
2192 rbd_dev = orig_request->img_request->rbd_dev;
2193 rbd_assert(rbd_dev);
2194 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2199 /* Allocate the new copyup osd request for the original request */
2202 rbd_assert(!orig_request->osd_req);
2203 osd_req = rbd_osd_req_create_copyup(orig_request);
2206 orig_request->osd_req = osd_req;
2207 orig_request->copyup_pages = pages;
2209 /* Initialize the copyup op */
2211 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2212 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2215 /* Then the original write request op */
2217 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2218 orig_request->offset,
2219 orig_request->length, 0, 0);
2220 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2221 orig_request->length);
2223 rbd_osd_req_format_write(orig_request);
2225 /* All set, send it off. */
2227 orig_request->callback = rbd_img_obj_copyup_callback;
2228 osdc = &rbd_dev->rbd_client->client->osdc;
2229 result = rbd_obj_request_submit(osdc, orig_request);
2233 /* Record the error code and complete the request */
2235 orig_request->result = result;
2236 orig_request->xferred = 0;
2237 obj_request_done_set(orig_request);
2238 rbd_obj_request_complete(orig_request);
2242 * Read from the parent image the range of data that covers the
2243 * entire target of the given object request. This is used for
2244 * satisfying a layered image write request when the target of an
2245 * object request from the image request does not exist.
2247 * A page array big enough to hold the returned data is allocated
2248 * and supplied to rbd_img_request_fill() as the "data descriptor."
2249 * When the read completes, this page array will be transferred to
2250 * the original object request for the copyup operation.
2252 * If an error occurs, record it as the result of the original
2253 * object request and mark it done so it gets completed.
2255 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2257 struct rbd_img_request *img_request = NULL;
2258 struct rbd_img_request *parent_request = NULL;
2259 struct rbd_device *rbd_dev;
2262 struct page **pages = NULL;
2266 rbd_assert(obj_request_img_data_test(obj_request));
2267 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2269 img_request = obj_request->img_request;
2270 rbd_assert(img_request != NULL);
2271 rbd_dev = img_request->rbd_dev;
2272 rbd_assert(rbd_dev->parent != NULL);
2275 * First things first. The original osd request is of no
2276 * use to use any more, we'll need a new one that can hold
2277 * the two ops in a copyup request. We'll get that later,
2278 * but for now we can release the old one.
2280 rbd_osd_req_destroy(obj_request->osd_req);
2281 obj_request->osd_req = NULL;
2284 * Determine the byte range covered by the object in the
2285 * child image to which the original request was to be sent.
2287 img_offset = obj_request->img_offset - obj_request->offset;
2288 length = (u64)1 << rbd_dev->header.obj_order;
2291 * There is no defined parent data beyond the parent
2292 * overlap, so limit what we read at that boundary if
2295 if (img_offset + length > rbd_dev->parent_overlap) {
2296 rbd_assert(img_offset < rbd_dev->parent_overlap);
2297 length = rbd_dev->parent_overlap - img_offset;
2301 * Allocate a page array big enough to receive the data read
2304 page_count = (u32)calc_pages_for(0, length);
2305 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2306 if (IS_ERR(pages)) {
2307 result = PTR_ERR(pages);
2313 parent_request = rbd_img_request_create(rbd_dev->parent,
2316 if (!parent_request)
2318 rbd_obj_request_get(obj_request);
2319 parent_request->obj_request = obj_request;
2321 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2324 parent_request->copyup_pages = pages;
2326 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2327 result = rbd_img_request_submit(parent_request);
2331 parent_request->copyup_pages = NULL;
2332 parent_request->obj_request = NULL;
2333 rbd_obj_request_put(obj_request);
2336 ceph_release_page_vector(pages, page_count);
2338 rbd_img_request_put(parent_request);
2339 obj_request->result = result;
2340 obj_request->xferred = 0;
2341 obj_request_done_set(obj_request);
2346 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2348 struct rbd_obj_request *orig_request;
2351 rbd_assert(!obj_request_img_data_test(obj_request));
2354 * All we need from the object request is the original
2355 * request and the result of the STAT op. Grab those, then
2356 * we're done with the request.
2358 orig_request = obj_request->obj_request;
2359 obj_request->obj_request = NULL;
2360 rbd_assert(orig_request);
2361 rbd_assert(orig_request->img_request);
2363 result = obj_request->result;
2364 obj_request->result = 0;
2366 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2367 obj_request, orig_request, result,
2368 obj_request->xferred, obj_request->length);
2369 rbd_obj_request_put(obj_request);
2371 rbd_assert(orig_request);
2372 rbd_assert(orig_request->img_request);
2375 * Our only purpose here is to determine whether the object
2376 * exists, and we don't want to treat the non-existence as
2377 * an error. If something else comes back, transfer the
2378 * error to the original request and complete it now.
2381 obj_request_existence_set(orig_request, true);
2382 } else if (result == -ENOENT) {
2383 obj_request_existence_set(orig_request, false);
2384 } else if (result) {
2385 orig_request->result = result;
2390 * Resubmit the original request now that we have recorded
2391 * whether the target object exists.
2393 orig_request->result = rbd_img_obj_request_submit(orig_request);
2395 if (orig_request->result)
2396 rbd_obj_request_complete(orig_request);
2397 rbd_obj_request_put(orig_request);
2400 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2402 struct rbd_obj_request *stat_request;
2403 struct rbd_device *rbd_dev;
2404 struct ceph_osd_client *osdc;
2405 struct page **pages = NULL;
2411 * The response data for a STAT call consists of:
2418 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2419 page_count = (u32)calc_pages_for(0, size);
2420 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2422 return PTR_ERR(pages);
2425 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2430 rbd_obj_request_get(obj_request);
2431 stat_request->obj_request = obj_request;
2432 stat_request->pages = pages;
2433 stat_request->page_count = page_count;
2435 rbd_assert(obj_request->img_request);
2436 rbd_dev = obj_request->img_request->rbd_dev;
2437 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2439 if (!stat_request->osd_req)
2441 stat_request->callback = rbd_img_obj_exists_callback;
2443 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2444 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2446 rbd_osd_req_format_read(stat_request);
2448 osdc = &rbd_dev->rbd_client->client->osdc;
2449 ret = rbd_obj_request_submit(osdc, stat_request);
2452 rbd_obj_request_put(obj_request);
2457 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2459 struct rbd_img_request *img_request;
2460 struct rbd_device *rbd_dev;
2463 rbd_assert(obj_request_img_data_test(obj_request));
2465 img_request = obj_request->img_request;
2466 rbd_assert(img_request);
2467 rbd_dev = img_request->rbd_dev;
2470 * Only writes to layered images need special handling.
2471 * Reads and non-layered writes are simple object requests.
2472 * Layered writes that start beyond the end of the overlap
2473 * with the parent have no parent data, so they too are
2474 * simple object requests. Finally, if the target object is
2475 * known to already exist, its parent data has already been
2476 * copied, so a write to the object can also be handled as a
2477 * simple object request.
2479 if (!img_request_write_test(img_request) ||
2480 !img_request_layered_test(img_request) ||
2481 rbd_dev->parent_overlap <= obj_request->img_offset ||
2482 ((known = obj_request_known_test(obj_request)) &&
2483 obj_request_exists_test(obj_request))) {
2485 struct rbd_device *rbd_dev;
2486 struct ceph_osd_client *osdc;
2488 rbd_dev = obj_request->img_request->rbd_dev;
2489 osdc = &rbd_dev->rbd_client->client->osdc;
2491 return rbd_obj_request_submit(osdc, obj_request);
2495 * It's a layered write. The target object might exist but
2496 * we may not know that yet. If we know it doesn't exist,
2497 * start by reading the data for the full target object from
2498 * the parent so we can use it for a copyup to the target.
2501 return rbd_img_obj_parent_read_full(obj_request);
2503 /* We don't know whether the target exists. Go find out. */
2505 return rbd_img_obj_exists_submit(obj_request);
2508 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2510 struct rbd_obj_request *obj_request;
2511 struct rbd_obj_request *next_obj_request;
2513 dout("%s: img %p\n", __func__, img_request);
2514 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2517 ret = rbd_img_obj_request_submit(obj_request);
2525 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2527 struct rbd_obj_request *obj_request;
2528 struct rbd_device *rbd_dev;
2531 rbd_assert(img_request_child_test(img_request));
2533 obj_request = img_request->obj_request;
2534 rbd_assert(obj_request);
2535 rbd_assert(obj_request->img_request);
2537 obj_request->result = img_request->result;
2538 if (obj_request->result)
2542 * We need to zero anything beyond the parent overlap
2543 * boundary. Since rbd_img_obj_request_read_callback()
2544 * will zero anything beyond the end of a short read, an
2545 * easy way to do this is to pretend the data from the
2546 * parent came up short--ending at the overlap boundary.
2548 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2549 obj_end = obj_request->img_offset + obj_request->length;
2550 rbd_dev = obj_request->img_request->rbd_dev;
2551 if (obj_end > rbd_dev->parent_overlap) {
2554 if (obj_request->img_offset < rbd_dev->parent_overlap)
2555 xferred = rbd_dev->parent_overlap -
2556 obj_request->img_offset;
2558 obj_request->xferred = min(img_request->xferred, xferred);
2560 obj_request->xferred = img_request->xferred;
2563 rbd_img_request_put(img_request);
2564 rbd_img_obj_request_read_callback(obj_request);
2565 rbd_obj_request_complete(obj_request);
2568 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2570 struct rbd_device *rbd_dev;
2571 struct rbd_img_request *img_request;
2574 rbd_assert(obj_request_img_data_test(obj_request));
2575 rbd_assert(obj_request->img_request != NULL);
2576 rbd_assert(obj_request->result == (s32) -ENOENT);
2577 rbd_assert(obj_request_type_valid(obj_request->type));
2579 rbd_dev = obj_request->img_request->rbd_dev;
2580 rbd_assert(rbd_dev->parent != NULL);
2581 /* rbd_read_finish(obj_request, obj_request->length); */
2582 img_request = rbd_img_request_create(rbd_dev->parent,
2583 obj_request->img_offset,
2584 obj_request->length,
2590 rbd_obj_request_get(obj_request);
2591 img_request->obj_request = obj_request;
2593 if (obj_request->type == OBJ_REQUEST_BIO)
2594 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2595 obj_request->bio_list);
2597 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2598 obj_request->pages);
2602 img_request->callback = rbd_img_parent_read_callback;
2603 result = rbd_img_request_submit(img_request);
2610 rbd_img_request_put(img_request);
2611 obj_request->result = result;
2612 obj_request->xferred = 0;
2613 obj_request_done_set(obj_request);
2616 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2618 struct rbd_obj_request *obj_request;
2619 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2622 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2623 OBJ_REQUEST_NODATA);
2628 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2629 if (!obj_request->osd_req)
2631 obj_request->callback = rbd_obj_request_put;
2633 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2635 rbd_osd_req_format_read(obj_request);
2637 ret = rbd_obj_request_submit(osdc, obj_request);
2640 rbd_obj_request_put(obj_request);
2645 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2647 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2653 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2654 rbd_dev->header_name, (unsigned long long)notify_id,
2655 (unsigned int)opcode);
2656 ret = rbd_dev_refresh(rbd_dev);
2658 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2660 rbd_obj_notify_ack(rbd_dev, notify_id);
2664 * Request sync osd watch/unwatch. The value of "start" determines
2665 * whether a watch request is being initiated or torn down.
2667 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2669 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2670 struct rbd_obj_request *obj_request;
2673 rbd_assert(start ^ !!rbd_dev->watch_event);
2674 rbd_assert(start ^ !!rbd_dev->watch_request);
2677 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2678 &rbd_dev->watch_event);
2681 rbd_assert(rbd_dev->watch_event != NULL);
2685 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2686 OBJ_REQUEST_NODATA);
2690 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2691 if (!obj_request->osd_req)
2695 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2697 ceph_osdc_unregister_linger_request(osdc,
2698 rbd_dev->watch_request->osd_req);
2700 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2701 rbd_dev->watch_event->cookie, 0, start);
2702 rbd_osd_req_format_write(obj_request);
2704 ret = rbd_obj_request_submit(osdc, obj_request);
2707 ret = rbd_obj_request_wait(obj_request);
2710 ret = obj_request->result;
2715 * A watch request is set to linger, so the underlying osd
2716 * request won't go away until we unregister it. We retain
2717 * a pointer to the object request during that time (in
2718 * rbd_dev->watch_request), so we'll keep a reference to
2719 * it. We'll drop that reference (below) after we've
2723 rbd_dev->watch_request = obj_request;
2728 /* We have successfully torn down the watch request */
2730 rbd_obj_request_put(rbd_dev->watch_request);
2731 rbd_dev->watch_request = NULL;
2733 /* Cancel the event if we're tearing down, or on error */
2734 ceph_osdc_cancel_event(rbd_dev->watch_event);
2735 rbd_dev->watch_event = NULL;
2737 rbd_obj_request_put(obj_request);
2743 * Synchronous osd object method call. Returns the number of bytes
2744 * returned in the outbound buffer, or a negative error code.
2746 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2747 const char *object_name,
2748 const char *class_name,
2749 const char *method_name,
2750 const void *outbound,
2751 size_t outbound_size,
2753 size_t inbound_size)
2755 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2756 struct rbd_obj_request *obj_request;
2757 struct page **pages;
2762 * Method calls are ultimately read operations. The result
2763 * should placed into the inbound buffer provided. They
2764 * also supply outbound data--parameters for the object
2765 * method. Currently if this is present it will be a
2768 page_count = (u32)calc_pages_for(0, inbound_size);
2769 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2771 return PTR_ERR(pages);
2774 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2779 obj_request->pages = pages;
2780 obj_request->page_count = page_count;
2782 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2783 if (!obj_request->osd_req)
2786 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2787 class_name, method_name);
2788 if (outbound_size) {
2789 struct ceph_pagelist *pagelist;
2791 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2795 ceph_pagelist_init(pagelist);
2796 ceph_pagelist_append(pagelist, outbound, outbound_size);
2797 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2800 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2801 obj_request->pages, inbound_size,
2803 rbd_osd_req_format_read(obj_request);
2805 ret = rbd_obj_request_submit(osdc, obj_request);
2808 ret = rbd_obj_request_wait(obj_request);
2812 ret = obj_request->result;
2816 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2817 ret = (int)obj_request->xferred;
2818 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2821 rbd_obj_request_put(obj_request);
2823 ceph_release_page_vector(pages, page_count);
2828 static void rbd_request_fn(struct request_queue *q)
2829 __releases(q->queue_lock) __acquires(q->queue_lock)
2831 struct rbd_device *rbd_dev = q->queuedata;
2832 bool read_only = rbd_dev->mapping.read_only;
2836 while ((rq = blk_fetch_request(q))) {
2837 bool write_request = rq_data_dir(rq) == WRITE;
2838 struct rbd_img_request *img_request;
2842 /* Ignore any non-FS requests that filter through. */
2844 if (rq->cmd_type != REQ_TYPE_FS) {
2845 dout("%s: non-fs request type %d\n", __func__,
2846 (int) rq->cmd_type);
2847 __blk_end_request_all(rq, 0);
2851 /* Ignore/skip any zero-length requests */
2853 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2854 length = (u64) blk_rq_bytes(rq);
2857 dout("%s: zero-length request\n", __func__);
2858 __blk_end_request_all(rq, 0);
2862 spin_unlock_irq(q->queue_lock);
2864 /* Disallow writes to a read-only device */
2866 if (write_request) {
2870 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2874 * Quit early if the mapped snapshot no longer
2875 * exists. It's still possible the snapshot will
2876 * have disappeared by the time our request arrives
2877 * at the osd, but there's no sense in sending it if
2880 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2881 dout("request for non-existent snapshot");
2882 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2888 if (offset && length > U64_MAX - offset + 1) {
2889 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2891 goto end_request; /* Shouldn't happen */
2895 if (offset + length > rbd_dev->mapping.size) {
2896 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2897 offset, length, rbd_dev->mapping.size);
2902 img_request = rbd_img_request_create(rbd_dev, offset, length,
2903 write_request, false);
2907 img_request->rq = rq;
2909 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2912 result = rbd_img_request_submit(img_request);
2914 rbd_img_request_put(img_request);
2916 spin_lock_irq(q->queue_lock);
2918 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2919 write_request ? "write" : "read",
2920 length, offset, result);
2922 __blk_end_request_all(rq, result);
2928 * a queue callback. Makes sure that we don't create a bio that spans across
2929 * multiple osd objects. One exception would be with a single page bios,
2930 * which we handle later at bio_chain_clone_range()
2932 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2933 struct bio_vec *bvec)
2935 struct rbd_device *rbd_dev = q->queuedata;
2936 sector_t sector_offset;
2937 sector_t sectors_per_obj;
2938 sector_t obj_sector_offset;
2942 * Find how far into its rbd object the partition-relative
2943 * bio start sector is to offset relative to the enclosing
2946 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2947 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2948 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2951 * Compute the number of bytes from that offset to the end
2952 * of the object. Account for what's already used by the bio.
2954 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2955 if (ret > bmd->bi_size)
2956 ret -= bmd->bi_size;
2961 * Don't send back more than was asked for. And if the bio
2962 * was empty, let the whole thing through because: "Note
2963 * that a block device *must* allow a single page to be
2964 * added to an empty bio."
2966 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2967 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2968 ret = (int) bvec->bv_len;
2973 static void rbd_free_disk(struct rbd_device *rbd_dev)
2975 struct gendisk *disk = rbd_dev->disk;
2980 rbd_dev->disk = NULL;
2981 if (disk->flags & GENHD_FL_UP) {
2984 blk_cleanup_queue(disk->queue);
2989 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2990 const char *object_name,
2991 u64 offset, u64 length, void *buf)
2994 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2995 struct rbd_obj_request *obj_request;
2996 struct page **pages = NULL;
3001 page_count = (u32) calc_pages_for(offset, length);
3002 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3004 ret = PTR_ERR(pages);
3007 obj_request = rbd_obj_request_create(object_name, offset, length,
3012 obj_request->pages = pages;
3013 obj_request->page_count = page_count;
3015 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3016 if (!obj_request->osd_req)
3019 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3020 offset, length, 0, 0);
3021 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3023 obj_request->length,
3024 obj_request->offset & ~PAGE_MASK,
3026 rbd_osd_req_format_read(obj_request);
3028 ret = rbd_obj_request_submit(osdc, obj_request);
3031 ret = rbd_obj_request_wait(obj_request);
3035 ret = obj_request->result;
3039 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3040 size = (size_t) obj_request->xferred;
3041 ceph_copy_from_page_vector(pages, buf, 0, size);
3042 rbd_assert(size <= (size_t)INT_MAX);
3046 rbd_obj_request_put(obj_request);
3048 ceph_release_page_vector(pages, page_count);
3054 * Read the complete header for the given rbd device. On successful
3055 * return, the rbd_dev->header field will contain up-to-date
3056 * information about the image.
3058 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3060 struct rbd_image_header_ondisk *ondisk = NULL;
3067 * The complete header will include an array of its 64-bit
3068 * snapshot ids, followed by the names of those snapshots as
3069 * a contiguous block of NUL-terminated strings. Note that
3070 * the number of snapshots could change by the time we read
3071 * it in, in which case we re-read it.
3078 size = sizeof (*ondisk);
3079 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3081 ondisk = kmalloc(size, GFP_KERNEL);
3085 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3089 if ((size_t)ret < size) {
3091 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3095 if (!rbd_dev_ondisk_valid(ondisk)) {
3097 rbd_warn(rbd_dev, "invalid header");
3101 names_size = le64_to_cpu(ondisk->snap_names_len);
3102 want_count = snap_count;
3103 snap_count = le32_to_cpu(ondisk->snap_count);
3104 } while (snap_count != want_count);
3106 ret = rbd_header_from_disk(rbd_dev, ondisk);
3114 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3115 * has disappeared from the (just updated) snapshot context.
3117 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3121 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3124 snap_id = rbd_dev->spec->snap_id;
3125 if (snap_id == CEPH_NOSNAP)
3128 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3129 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3132 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3137 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3138 mapping_size = rbd_dev->mapping.size;
3139 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3140 if (rbd_dev->image_format == 1)
3141 ret = rbd_dev_v1_header_info(rbd_dev);
3143 ret = rbd_dev_v2_header_info(rbd_dev);
3145 /* If it's a mapped snapshot, validate its EXISTS flag */
3147 rbd_exists_validate(rbd_dev);
3148 mutex_unlock(&ctl_mutex);
3149 if (mapping_size != rbd_dev->mapping.size) {
3152 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3153 dout("setting size to %llu sectors", (unsigned long long)size);
3154 set_capacity(rbd_dev->disk, size);
3155 revalidate_disk(rbd_dev->disk);
3161 static int rbd_init_disk(struct rbd_device *rbd_dev)
3163 struct gendisk *disk;
3164 struct request_queue *q;
3167 /* create gendisk info */
3168 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3172 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3174 disk->major = rbd_dev->major;
3175 disk->first_minor = 0;
3176 disk->fops = &rbd_bd_ops;
3177 disk->private_data = rbd_dev;
3179 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3183 /* We use the default size, but let's be explicit about it. */
3184 blk_queue_physical_block_size(q, SECTOR_SIZE);
3186 /* set io sizes to object size */
3187 segment_size = rbd_obj_bytes(&rbd_dev->header);
3188 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3189 blk_queue_max_segment_size(q, segment_size);
3190 blk_queue_io_min(q, segment_size);
3191 blk_queue_io_opt(q, segment_size);
3193 blk_queue_merge_bvec(q, rbd_merge_bvec);
3196 q->queuedata = rbd_dev;
3198 rbd_dev->disk = disk;
3211 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3213 return container_of(dev, struct rbd_device, dev);
3216 static ssize_t rbd_size_show(struct device *dev,
3217 struct device_attribute *attr, char *buf)
3219 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3221 return sprintf(buf, "%llu\n",
3222 (unsigned long long)rbd_dev->mapping.size);
3226 * Note this shows the features for whatever's mapped, which is not
3227 * necessarily the base image.
3229 static ssize_t rbd_features_show(struct device *dev,
3230 struct device_attribute *attr, char *buf)
3232 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3234 return sprintf(buf, "0x%016llx\n",
3235 (unsigned long long)rbd_dev->mapping.features);
3238 static ssize_t rbd_major_show(struct device *dev,
3239 struct device_attribute *attr, char *buf)
3241 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3244 return sprintf(buf, "%d\n", rbd_dev->major);
3246 return sprintf(buf, "(none)\n");
3250 static ssize_t rbd_client_id_show(struct device *dev,
3251 struct device_attribute *attr, char *buf)
3253 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3255 return sprintf(buf, "client%lld\n",
3256 ceph_client_id(rbd_dev->rbd_client->client));
3259 static ssize_t rbd_pool_show(struct device *dev,
3260 struct device_attribute *attr, char *buf)
3262 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3264 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3267 static ssize_t rbd_pool_id_show(struct device *dev,
3268 struct device_attribute *attr, char *buf)
3270 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3272 return sprintf(buf, "%llu\n",
3273 (unsigned long long) rbd_dev->spec->pool_id);
3276 static ssize_t rbd_name_show(struct device *dev,
3277 struct device_attribute *attr, char *buf)
3279 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3281 if (rbd_dev->spec->image_name)
3282 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3284 return sprintf(buf, "(unknown)\n");
3287 static ssize_t rbd_image_id_show(struct device *dev,
3288 struct device_attribute *attr, char *buf)
3290 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3292 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3296 * Shows the name of the currently-mapped snapshot (or
3297 * RBD_SNAP_HEAD_NAME for the base image).
3299 static ssize_t rbd_snap_show(struct device *dev,
3300 struct device_attribute *attr,
3303 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3305 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3309 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3310 * for the parent image. If there is no parent, simply shows
3311 * "(no parent image)".
3313 static ssize_t rbd_parent_show(struct device *dev,
3314 struct device_attribute *attr,
3317 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3318 struct rbd_spec *spec = rbd_dev->parent_spec;
3323 return sprintf(buf, "(no parent image)\n");
3325 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3326 (unsigned long long) spec->pool_id, spec->pool_name);
3331 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3332 spec->image_name ? spec->image_name : "(unknown)");
3337 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3338 (unsigned long long) spec->snap_id, spec->snap_name);
3343 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3348 return (ssize_t) (bufp - buf);
3351 static ssize_t rbd_image_refresh(struct device *dev,
3352 struct device_attribute *attr,
3356 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3359 ret = rbd_dev_refresh(rbd_dev);
3361 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3363 return ret < 0 ? ret : size;
3366 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3367 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3368 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3369 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3370 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3371 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3372 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3373 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3374 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3375 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3376 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3378 static struct attribute *rbd_attrs[] = {
3379 &dev_attr_size.attr,
3380 &dev_attr_features.attr,
3381 &dev_attr_major.attr,
3382 &dev_attr_client_id.attr,
3383 &dev_attr_pool.attr,
3384 &dev_attr_pool_id.attr,
3385 &dev_attr_name.attr,
3386 &dev_attr_image_id.attr,
3387 &dev_attr_current_snap.attr,
3388 &dev_attr_parent.attr,
3389 &dev_attr_refresh.attr,
3393 static struct attribute_group rbd_attr_group = {
3397 static const struct attribute_group *rbd_attr_groups[] = {
3402 static void rbd_sysfs_dev_release(struct device *dev)
3406 static struct device_type rbd_device_type = {
3408 .groups = rbd_attr_groups,
3409 .release = rbd_sysfs_dev_release,
3412 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3414 kref_get(&spec->kref);
3419 static void rbd_spec_free(struct kref *kref);
3420 static void rbd_spec_put(struct rbd_spec *spec)
3423 kref_put(&spec->kref, rbd_spec_free);
3426 static struct rbd_spec *rbd_spec_alloc(void)
3428 struct rbd_spec *spec;
3430 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3433 kref_init(&spec->kref);
3438 static void rbd_spec_free(struct kref *kref)
3440 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3442 kfree(spec->pool_name);
3443 kfree(spec->image_id);
3444 kfree(spec->image_name);
3445 kfree(spec->snap_name);
3449 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3450 struct rbd_spec *spec)
3452 struct rbd_device *rbd_dev;
3454 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3458 spin_lock_init(&rbd_dev->lock);
3460 INIT_LIST_HEAD(&rbd_dev->node);
3461 init_rwsem(&rbd_dev->header_rwsem);
3463 rbd_dev->spec = spec;
3464 rbd_dev->rbd_client = rbdc;
3466 /* Initialize the layout used for all rbd requests */
3468 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3469 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3470 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3471 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3476 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3478 rbd_put_client(rbd_dev->rbd_client);
3479 rbd_spec_put(rbd_dev->spec);
3484 * Get the size and object order for an image snapshot, or if
3485 * snap_id is CEPH_NOSNAP, gets this information for the base
3488 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3489 u8 *order, u64 *snap_size)
3491 __le64 snapid = cpu_to_le64(snap_id);
3496 } __attribute__ ((packed)) size_buf = { 0 };
3498 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3500 &snapid, sizeof (snapid),
3501 &size_buf, sizeof (size_buf));
3502 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3505 if (ret < sizeof (size_buf))
3509 *order = size_buf.order;
3510 *snap_size = le64_to_cpu(size_buf.size);
3512 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
3513 (unsigned long long)snap_id, (unsigned int)*order,
3514 (unsigned long long)*snap_size);
3519 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3521 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3522 &rbd_dev->header.obj_order,
3523 &rbd_dev->header.image_size);
3526 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3532 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3536 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3537 "rbd", "get_object_prefix", NULL, 0,
3538 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3539 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3544 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3545 p + ret, NULL, GFP_NOIO);
3548 if (IS_ERR(rbd_dev->header.object_prefix)) {
3549 ret = PTR_ERR(rbd_dev->header.object_prefix);
3550 rbd_dev->header.object_prefix = NULL;
3552 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3560 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3563 __le64 snapid = cpu_to_le64(snap_id);
3567 } __attribute__ ((packed)) features_buf = { 0 };
3571 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3572 "rbd", "get_features",
3573 &snapid, sizeof (snapid),
3574 &features_buf, sizeof (features_buf));
3575 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3578 if (ret < sizeof (features_buf))
3581 incompat = le64_to_cpu(features_buf.incompat);
3582 if (incompat & ~RBD_FEATURES_SUPPORTED)
3585 *snap_features = le64_to_cpu(features_buf.features);
3587 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3588 (unsigned long long)snap_id,
3589 (unsigned long long)*snap_features,
3590 (unsigned long long)le64_to_cpu(features_buf.incompat));
3595 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3597 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3598 &rbd_dev->header.features);
3601 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3603 struct rbd_spec *parent_spec;
3605 void *reply_buf = NULL;
3613 parent_spec = rbd_spec_alloc();
3617 size = sizeof (__le64) + /* pool_id */
3618 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3619 sizeof (__le64) + /* snap_id */
3620 sizeof (__le64); /* overlap */
3621 reply_buf = kmalloc(size, GFP_KERNEL);
3627 snapid = cpu_to_le64(CEPH_NOSNAP);
3628 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3629 "rbd", "get_parent",
3630 &snapid, sizeof (snapid),
3632 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3637 end = reply_buf + ret;
3639 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3640 if (parent_spec->pool_id == CEPH_NOPOOL)
3641 goto out; /* No parent? No problem. */
3643 /* The ceph file layout needs to fit pool id in 32 bits */
3646 if (parent_spec->pool_id > (u64)U32_MAX) {
3647 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3648 (unsigned long long)parent_spec->pool_id, U32_MAX);
3652 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3653 if (IS_ERR(image_id)) {
3654 ret = PTR_ERR(image_id);
3657 parent_spec->image_id = image_id;
3658 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3659 ceph_decode_64_safe(&p, end, overlap, out_err);
3661 rbd_dev->parent_overlap = overlap;
3662 rbd_dev->parent_spec = parent_spec;
3663 parent_spec = NULL; /* rbd_dev now owns this */
3668 rbd_spec_put(parent_spec);
3673 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3677 __le64 stripe_count;
3678 } __attribute__ ((packed)) striping_info_buf = { 0 };
3679 size_t size = sizeof (striping_info_buf);
3686 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3687 "rbd", "get_stripe_unit_count", NULL, 0,
3688 (char *)&striping_info_buf, size);
3689 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3696 * We don't actually support the "fancy striping" feature
3697 * (STRIPINGV2) yet, but if the striping sizes are the
3698 * defaults the behavior is the same as before. So find
3699 * out, and only fail if the image has non-default values.
3702 obj_size = (u64)1 << rbd_dev->header.obj_order;
3703 p = &striping_info_buf;
3704 stripe_unit = ceph_decode_64(&p);
3705 if (stripe_unit != obj_size) {
3706 rbd_warn(rbd_dev, "unsupported stripe unit "
3707 "(got %llu want %llu)",
3708 stripe_unit, obj_size);
3711 stripe_count = ceph_decode_64(&p);
3712 if (stripe_count != 1) {
3713 rbd_warn(rbd_dev, "unsupported stripe count "
3714 "(got %llu want 1)", stripe_count);
3717 rbd_dev->header.stripe_unit = stripe_unit;
3718 rbd_dev->header.stripe_count = stripe_count;
3723 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3725 size_t image_id_size;
3730 void *reply_buf = NULL;
3732 char *image_name = NULL;
3735 rbd_assert(!rbd_dev->spec->image_name);
3737 len = strlen(rbd_dev->spec->image_id);
3738 image_id_size = sizeof (__le32) + len;
3739 image_id = kmalloc(image_id_size, GFP_KERNEL);
3744 end = image_id + image_id_size;
3745 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3747 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3748 reply_buf = kmalloc(size, GFP_KERNEL);
3752 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3753 "rbd", "dir_get_name",
3754 image_id, image_id_size,
3759 end = reply_buf + ret;
3761 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3762 if (IS_ERR(image_name))
3765 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3773 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3775 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3776 const char *snap_name;
3779 /* Skip over names until we find the one we are looking for */
3781 snap_name = rbd_dev->header.snap_names;
3782 while (which < snapc->num_snaps) {
3783 if (!strcmp(name, snap_name))
3784 return snapc->snaps[which];
3785 snap_name += strlen(snap_name) + 1;
3791 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3793 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3798 for (which = 0; !found && which < snapc->num_snaps; which++) {
3799 const char *snap_name;
3801 snap_id = snapc->snaps[which];
3802 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3803 if (IS_ERR(snap_name))
3805 found = !strcmp(name, snap_name);
3808 return found ? snap_id : CEPH_NOSNAP;
3812 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3813 * no snapshot by that name is found, or if an error occurs.
3815 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3817 if (rbd_dev->image_format == 1)
3818 return rbd_v1_snap_id_by_name(rbd_dev, name);
3820 return rbd_v2_snap_id_by_name(rbd_dev, name);
3824 * When an rbd image has a parent image, it is identified by the
3825 * pool, image, and snapshot ids (not names). This function fills
3826 * in the names for those ids. (It's OK if we can't figure out the
3827 * name for an image id, but the pool and snapshot ids should always
3828 * exist and have names.) All names in an rbd spec are dynamically
3831 * When an image being mapped (not a parent) is probed, we have the
3832 * pool name and pool id, image name and image id, and the snapshot
3833 * name. The only thing we're missing is the snapshot id.
3835 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3837 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3838 struct rbd_spec *spec = rbd_dev->spec;
3839 const char *pool_name;
3840 const char *image_name;
3841 const char *snap_name;
3845 * An image being mapped will have the pool name (etc.), but
3846 * we need to look up the snapshot id.
3848 if (spec->pool_name) {
3849 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3852 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3853 if (snap_id == CEPH_NOSNAP)
3855 spec->snap_id = snap_id;
3857 spec->snap_id = CEPH_NOSNAP;
3863 /* Get the pool name; we have to make our own copy of this */
3865 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3867 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3870 pool_name = kstrdup(pool_name, GFP_KERNEL);
3874 /* Fetch the image name; tolerate failure here */
3876 image_name = rbd_dev_image_name(rbd_dev);
3878 rbd_warn(rbd_dev, "unable to get image name");
3880 /* Look up the snapshot name, and make a copy */
3882 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3888 spec->pool_name = pool_name;
3889 spec->image_name = image_name;
3890 spec->snap_name = snap_name;
3900 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3909 struct ceph_snap_context *snapc;
3913 * We'll need room for the seq value (maximum snapshot id),
3914 * snapshot count, and array of that many snapshot ids.
3915 * For now we have a fixed upper limit on the number we're
3916 * prepared to receive.
3918 size = sizeof (__le64) + sizeof (__le32) +
3919 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3920 reply_buf = kzalloc(size, GFP_KERNEL);
3924 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3925 "rbd", "get_snapcontext", NULL, 0,
3927 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3932 end = reply_buf + ret;
3934 ceph_decode_64_safe(&p, end, seq, out);
3935 ceph_decode_32_safe(&p, end, snap_count, out);
3938 * Make sure the reported number of snapshot ids wouldn't go
3939 * beyond the end of our buffer. But before checking that,
3940 * make sure the computed size of the snapshot context we
3941 * allocate is representable in a size_t.
3943 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3948 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3952 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3958 for (i = 0; i < snap_count; i++)
3959 snapc->snaps[i] = ceph_decode_64(&p);
3961 ceph_put_snap_context(rbd_dev->header.snapc);
3962 rbd_dev->header.snapc = snapc;
3964 dout(" snap context seq = %llu, snap_count = %u\n",
3965 (unsigned long long)seq, (unsigned int)snap_count);
3972 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3983 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3984 reply_buf = kmalloc(size, GFP_KERNEL);
3986 return ERR_PTR(-ENOMEM);
3988 snapid = cpu_to_le64(snap_id);
3989 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3990 "rbd", "get_snapshot_name",
3991 &snapid, sizeof (snapid),
3993 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3995 snap_name = ERR_PTR(ret);
4000 end = reply_buf + ret;
4001 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4002 if (IS_ERR(snap_name))
4005 dout(" snap_id 0x%016llx snap_name = %s\n",
4006 (unsigned long long)snap_id, snap_name);
4013 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4015 bool first_time = rbd_dev->header.object_prefix == NULL;
4018 down_write(&rbd_dev->header_rwsem);
4021 ret = rbd_dev_v2_header_onetime(rbd_dev);
4026 ret = rbd_dev_v2_image_size(rbd_dev);
4029 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4030 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4031 rbd_dev->mapping.size = rbd_dev->header.image_size;
4033 ret = rbd_dev_v2_snap_context(rbd_dev);
4034 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4038 up_write(&rbd_dev->header_rwsem);
4043 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4048 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4050 dev = &rbd_dev->dev;
4051 dev->bus = &rbd_bus_type;
4052 dev->type = &rbd_device_type;
4053 dev->parent = &rbd_root_dev;
4054 dev->release = rbd_dev_device_release;
4055 dev_set_name(dev, "%d", rbd_dev->dev_id);
4056 ret = device_register(dev);
4058 mutex_unlock(&ctl_mutex);
4063 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4065 device_unregister(&rbd_dev->dev);
4068 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4071 * Get a unique rbd identifier for the given new rbd_dev, and add
4072 * the rbd_dev to the global list. The minimum rbd id is 1.
4074 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4076 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4078 spin_lock(&rbd_dev_list_lock);
4079 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4080 spin_unlock(&rbd_dev_list_lock);
4081 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4082 (unsigned long long) rbd_dev->dev_id);
4086 * Remove an rbd_dev from the global list, and record that its
4087 * identifier is no longer in use.
4089 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4091 struct list_head *tmp;
4092 int rbd_id = rbd_dev->dev_id;
4095 rbd_assert(rbd_id > 0);
4097 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4098 (unsigned long long) rbd_dev->dev_id);
4099 spin_lock(&rbd_dev_list_lock);
4100 list_del_init(&rbd_dev->node);
4103 * If the id being "put" is not the current maximum, there
4104 * is nothing special we need to do.
4106 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4107 spin_unlock(&rbd_dev_list_lock);
4112 * We need to update the current maximum id. Search the
4113 * list to find out what it is. We're more likely to find
4114 * the maximum at the end, so search the list backward.
4117 list_for_each_prev(tmp, &rbd_dev_list) {
4118 struct rbd_device *rbd_dev;
4120 rbd_dev = list_entry(tmp, struct rbd_device, node);
4121 if (rbd_dev->dev_id > max_id)
4122 max_id = rbd_dev->dev_id;
4124 spin_unlock(&rbd_dev_list_lock);
4127 * The max id could have been updated by rbd_dev_id_get(), in
4128 * which case it now accurately reflects the new maximum.
4129 * Be careful not to overwrite the maximum value in that
4132 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4133 dout(" max dev id has been reset\n");
4137 * Skips over white space at *buf, and updates *buf to point to the
4138 * first found non-space character (if any). Returns the length of
4139 * the token (string of non-white space characters) found. Note
4140 * that *buf must be terminated with '\0'.
4142 static inline size_t next_token(const char **buf)
4145 * These are the characters that produce nonzero for
4146 * isspace() in the "C" and "POSIX" locales.
4148 const char *spaces = " \f\n\r\t\v";
4150 *buf += strspn(*buf, spaces); /* Find start of token */
4152 return strcspn(*buf, spaces); /* Return token length */
4156 * Finds the next token in *buf, and if the provided token buffer is
4157 * big enough, copies the found token into it. The result, if
4158 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4159 * must be terminated with '\0' on entry.
4161 * Returns the length of the token found (not including the '\0').
4162 * Return value will be 0 if no token is found, and it will be >=
4163 * token_size if the token would not fit.
4165 * The *buf pointer will be updated to point beyond the end of the
4166 * found token. Note that this occurs even if the token buffer is
4167 * too small to hold it.
4169 static inline size_t copy_token(const char **buf,
4175 len = next_token(buf);
4176 if (len < token_size) {
4177 memcpy(token, *buf, len);
4178 *(token + len) = '\0';
4186 * Finds the next token in *buf, dynamically allocates a buffer big
4187 * enough to hold a copy of it, and copies the token into the new
4188 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4189 * that a duplicate buffer is created even for a zero-length token.
4191 * Returns a pointer to the newly-allocated duplicate, or a null
4192 * pointer if memory for the duplicate was not available. If
4193 * the lenp argument is a non-null pointer, the length of the token
4194 * (not including the '\0') is returned in *lenp.
4196 * If successful, the *buf pointer will be updated to point beyond
4197 * the end of the found token.
4199 * Note: uses GFP_KERNEL for allocation.
4201 static inline char *dup_token(const char **buf, size_t *lenp)
4206 len = next_token(buf);
4207 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4210 *(dup + len) = '\0';
4220 * Parse the options provided for an "rbd add" (i.e., rbd image
4221 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4222 * and the data written is passed here via a NUL-terminated buffer.
4223 * Returns 0 if successful or an error code otherwise.
4225 * The information extracted from these options is recorded in
4226 * the other parameters which return dynamically-allocated
4229 * The address of a pointer that will refer to a ceph options
4230 * structure. Caller must release the returned pointer using
4231 * ceph_destroy_options() when it is no longer needed.
4233 * Address of an rbd options pointer. Fully initialized by
4234 * this function; caller must release with kfree().
4236 * Address of an rbd image specification pointer. Fully
4237 * initialized by this function based on parsed options.
4238 * Caller must release with rbd_spec_put().
4240 * The options passed take this form:
4241 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4244 * A comma-separated list of one or more monitor addresses.
4245 * A monitor address is an ip address, optionally followed
4246 * by a port number (separated by a colon).
4247 * I.e.: ip1[:port1][,ip2[:port2]...]
4249 * A comma-separated list of ceph and/or rbd options.
4251 * The name of the rados pool containing the rbd image.
4253 * The name of the image in that pool to map.
4255 * An optional snapshot id. If provided, the mapping will
4256 * present data from the image at the time that snapshot was
4257 * created. The image head is used if no snapshot id is
4258 * provided. Snapshot mappings are always read-only.
4260 static int rbd_add_parse_args(const char *buf,
4261 struct ceph_options **ceph_opts,
4262 struct rbd_options **opts,
4263 struct rbd_spec **rbd_spec)
4267 const char *mon_addrs;
4269 size_t mon_addrs_size;
4270 struct rbd_spec *spec = NULL;
4271 struct rbd_options *rbd_opts = NULL;
4272 struct ceph_options *copts;
4275 /* The first four tokens are required */
4277 len = next_token(&buf);
4279 rbd_warn(NULL, "no monitor address(es) provided");
4283 mon_addrs_size = len + 1;
4287 options = dup_token(&buf, NULL);
4291 rbd_warn(NULL, "no options provided");
4295 spec = rbd_spec_alloc();
4299 spec->pool_name = dup_token(&buf, NULL);
4300 if (!spec->pool_name)
4302 if (!*spec->pool_name) {
4303 rbd_warn(NULL, "no pool name provided");
4307 spec->image_name = dup_token(&buf, NULL);
4308 if (!spec->image_name)
4310 if (!*spec->image_name) {
4311 rbd_warn(NULL, "no image name provided");
4316 * Snapshot name is optional; default is to use "-"
4317 * (indicating the head/no snapshot).
4319 len = next_token(&buf);
4321 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4322 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4323 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4324 ret = -ENAMETOOLONG;
4327 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4330 *(snap_name + len) = '\0';
4331 spec->snap_name = snap_name;
4333 /* Initialize all rbd options to the defaults */
4335 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4339 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4341 copts = ceph_parse_options(options, mon_addrs,
4342 mon_addrs + mon_addrs_size - 1,
4343 parse_rbd_opts_token, rbd_opts);
4344 if (IS_ERR(copts)) {
4345 ret = PTR_ERR(copts);
4366 * An rbd format 2 image has a unique identifier, distinct from the
4367 * name given to it by the user. Internally, that identifier is
4368 * what's used to specify the names of objects related to the image.
4370 * A special "rbd id" object is used to map an rbd image name to its
4371 * id. If that object doesn't exist, then there is no v2 rbd image
4372 * with the supplied name.
4374 * This function will record the given rbd_dev's image_id field if
4375 * it can be determined, and in that case will return 0. If any
4376 * errors occur a negative errno will be returned and the rbd_dev's
4377 * image_id field will be unchanged (and should be NULL).
4379 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4388 * When probing a parent image, the image id is already
4389 * known (and the image name likely is not). There's no
4390 * need to fetch the image id again in this case. We
4391 * do still need to set the image format though.
4393 if (rbd_dev->spec->image_id) {
4394 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4400 * First, see if the format 2 image id file exists, and if
4401 * so, get the image's persistent id from it.
4403 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4404 object_name = kmalloc(size, GFP_NOIO);
4407 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4408 dout("rbd id object name is %s\n", object_name);
4410 /* Response will be an encoded string, which includes a length */
4412 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4413 response = kzalloc(size, GFP_NOIO);
4419 /* If it doesn't exist we'll assume it's a format 1 image */
4421 ret = rbd_obj_method_sync(rbd_dev, object_name,
4422 "rbd", "get_id", NULL, 0,
4423 response, RBD_IMAGE_ID_LEN_MAX);
4424 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4425 if (ret == -ENOENT) {
4426 image_id = kstrdup("", GFP_KERNEL);
4427 ret = image_id ? 0 : -ENOMEM;
4429 rbd_dev->image_format = 1;
4430 } else if (ret > sizeof (__le32)) {
4433 image_id = ceph_extract_encoded_string(&p, p + ret,
4435 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4437 rbd_dev->image_format = 2;
4443 rbd_dev->spec->image_id = image_id;
4444 dout("image_id is %s\n", image_id);
4453 /* Undo whatever state changes are made by v1 or v2 image probe */
4455 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4457 struct rbd_image_header *header;
4459 rbd_dev_remove_parent(rbd_dev);
4460 rbd_spec_put(rbd_dev->parent_spec);
4461 rbd_dev->parent_spec = NULL;
4462 rbd_dev->parent_overlap = 0;
4464 /* Free dynamic fields from the header, then zero it out */
4466 header = &rbd_dev->header;
4467 ceph_put_snap_context(header->snapc);
4468 kfree(header->snap_sizes);
4469 kfree(header->snap_names);
4470 kfree(header->object_prefix);
4471 memset(header, 0, sizeof (*header));
4474 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4478 ret = rbd_dev_v2_object_prefix(rbd_dev);
4483 * Get the and check features for the image. Currently the
4484 * features are assumed to never change.
4486 ret = rbd_dev_v2_features(rbd_dev);
4490 /* If the image supports layering, get the parent info */
4492 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4493 ret = rbd_dev_v2_parent_info(rbd_dev);
4497 * Print a warning if this image has a parent.
4498 * Don't print it if the image now being probed
4499 * is itself a parent. We can tell at this point
4500 * because we won't know its pool name yet (just its
4503 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
4504 rbd_warn(rbd_dev, "WARNING: kernel layering "
4505 "is EXPERIMENTAL!");
4508 /* If the image supports fancy striping, get its parameters */
4510 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4511 ret = rbd_dev_v2_striping_info(rbd_dev);
4515 /* No support for crypto and compression type format 2 images */
4519 rbd_dev->parent_overlap = 0;
4520 rbd_spec_put(rbd_dev->parent_spec);
4521 rbd_dev->parent_spec = NULL;
4522 kfree(rbd_dev->header_name);
4523 rbd_dev->header_name = NULL;
4524 kfree(rbd_dev->header.object_prefix);
4525 rbd_dev->header.object_prefix = NULL;
4530 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4532 struct rbd_device *parent = NULL;
4533 struct rbd_spec *parent_spec;
4534 struct rbd_client *rbdc;
4537 if (!rbd_dev->parent_spec)
4540 * We need to pass a reference to the client and the parent
4541 * spec when creating the parent rbd_dev. Images related by
4542 * parent/child relationships always share both.
4544 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4545 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4548 parent = rbd_dev_create(rbdc, parent_spec);
4552 ret = rbd_dev_image_probe(parent);
4555 rbd_dev->parent = parent;
4560 rbd_spec_put(rbd_dev->parent_spec);
4561 kfree(rbd_dev->header_name);
4562 rbd_dev_destroy(parent);
4564 rbd_put_client(rbdc);
4565 rbd_spec_put(parent_spec);
4571 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4575 /* generate unique id: find highest unique id, add one */
4576 rbd_dev_id_get(rbd_dev);
4578 /* Fill in the device name, now that we have its id. */
4579 BUILD_BUG_ON(DEV_NAME_LEN
4580 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4581 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4583 /* Get our block major device number. */
4585 ret = register_blkdev(0, rbd_dev->name);
4588 rbd_dev->major = ret;
4590 /* Set up the blkdev mapping. */
4592 ret = rbd_init_disk(rbd_dev);
4594 goto err_out_blkdev;
4596 ret = rbd_dev_mapping_set(rbd_dev);
4599 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4601 ret = rbd_bus_add_dev(rbd_dev);
4603 goto err_out_mapping;
4605 /* Everything's ready. Announce the disk to the world. */
4607 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4608 add_disk(rbd_dev->disk);
4610 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4611 (unsigned long long) rbd_dev->mapping.size);
4616 rbd_dev_mapping_clear(rbd_dev);
4618 rbd_free_disk(rbd_dev);
4620 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4622 rbd_dev_id_put(rbd_dev);
4623 rbd_dev_mapping_clear(rbd_dev);
4628 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4630 struct rbd_spec *spec = rbd_dev->spec;
4633 /* Record the header object name for this rbd image. */
4635 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4637 if (rbd_dev->image_format == 1)
4638 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4640 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4642 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4643 if (!rbd_dev->header_name)
4646 if (rbd_dev->image_format == 1)
4647 sprintf(rbd_dev->header_name, "%s%s",
4648 spec->image_name, RBD_SUFFIX);
4650 sprintf(rbd_dev->header_name, "%s%s",
4651 RBD_HEADER_PREFIX, spec->image_id);
4655 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4659 rbd_dev_unprobe(rbd_dev);
4660 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4662 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4663 kfree(rbd_dev->header_name);
4664 rbd_dev->header_name = NULL;
4665 rbd_dev->image_format = 0;
4666 kfree(rbd_dev->spec->image_id);
4667 rbd_dev->spec->image_id = NULL;
4669 rbd_dev_destroy(rbd_dev);
4673 * Probe for the existence of the header object for the given rbd
4676 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4682 * Get the id from the image id object. If it's not a
4683 * format 2 image, we'll get ENOENT back, and we'll assume
4684 * it's a format 1 image.
4686 ret = rbd_dev_image_id(rbd_dev);
4689 rbd_assert(rbd_dev->spec->image_id);
4690 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4692 ret = rbd_dev_header_name(rbd_dev);
4694 goto err_out_format;
4696 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4698 goto out_header_name;
4700 if (rbd_dev->image_format == 1)
4701 ret = rbd_dev_v1_header_info(rbd_dev);
4703 ret = rbd_dev_v2_header_info(rbd_dev);
4707 ret = rbd_dev_spec_update(rbd_dev);
4711 ret = rbd_dev_probe_parent(rbd_dev);
4715 dout("discovered format %u image, header name is %s\n",
4716 rbd_dev->image_format, rbd_dev->header_name);
4720 rbd_dev_unprobe(rbd_dev);
4722 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4724 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4726 kfree(rbd_dev->header_name);
4727 rbd_dev->header_name = NULL;
4729 rbd_dev->image_format = 0;
4730 kfree(rbd_dev->spec->image_id);
4731 rbd_dev->spec->image_id = NULL;
4733 dout("probe failed, returning %d\n", ret);
4738 static ssize_t rbd_add(struct bus_type *bus,
4742 struct rbd_device *rbd_dev = NULL;
4743 struct ceph_options *ceph_opts = NULL;
4744 struct rbd_options *rbd_opts = NULL;
4745 struct rbd_spec *spec = NULL;
4746 struct rbd_client *rbdc;
4747 struct ceph_osd_client *osdc;
4751 if (!try_module_get(THIS_MODULE))
4754 /* parse add command */
4755 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4757 goto err_out_module;
4758 read_only = rbd_opts->read_only;
4760 rbd_opts = NULL; /* done with this */
4762 rbdc = rbd_get_client(ceph_opts);
4767 ceph_opts = NULL; /* rbd_dev client now owns this */
4770 osdc = &rbdc->client->osdc;
4771 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4773 goto err_out_client;
4774 spec->pool_id = (u64)rc;
4776 /* The ceph file layout needs to fit pool id in 32 bits */
4778 if (spec->pool_id > (u64)U32_MAX) {
4779 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4780 (unsigned long long)spec->pool_id, U32_MAX);
4782 goto err_out_client;
4785 rbd_dev = rbd_dev_create(rbdc, spec);
4787 goto err_out_client;
4788 rbdc = NULL; /* rbd_dev now owns this */
4789 spec = NULL; /* rbd_dev now owns this */
4791 rc = rbd_dev_image_probe(rbd_dev);
4793 goto err_out_rbd_dev;
4795 /* If we are mapping a snapshot it must be marked read-only */
4797 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4799 rbd_dev->mapping.read_only = read_only;
4801 rc = rbd_dev_device_setup(rbd_dev);
4805 rbd_dev_image_release(rbd_dev);
4807 rbd_dev_destroy(rbd_dev);
4809 rbd_put_client(rbdc);
4812 ceph_destroy_options(ceph_opts);
4816 module_put(THIS_MODULE);
4818 dout("Error adding device %s\n", buf);
4823 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4825 struct list_head *tmp;
4826 struct rbd_device *rbd_dev;
4828 spin_lock(&rbd_dev_list_lock);
4829 list_for_each(tmp, &rbd_dev_list) {
4830 rbd_dev = list_entry(tmp, struct rbd_device, node);
4831 if (rbd_dev->dev_id == dev_id) {
4832 spin_unlock(&rbd_dev_list_lock);
4836 spin_unlock(&rbd_dev_list_lock);
4840 static void rbd_dev_device_release(struct device *dev)
4842 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4844 rbd_free_disk(rbd_dev);
4845 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4846 rbd_dev_mapping_clear(rbd_dev);
4847 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4849 rbd_dev_id_put(rbd_dev);
4850 rbd_dev_mapping_clear(rbd_dev);
4853 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4855 while (rbd_dev->parent) {
4856 struct rbd_device *first = rbd_dev;
4857 struct rbd_device *second = first->parent;
4858 struct rbd_device *third;
4861 * Follow to the parent with no grandparent and
4864 while (second && (third = second->parent)) {
4869 rbd_dev_image_release(second);
4870 first->parent = NULL;
4871 first->parent_overlap = 0;
4873 rbd_assert(first->parent_spec);
4874 rbd_spec_put(first->parent_spec);
4875 first->parent_spec = NULL;
4879 static ssize_t rbd_remove(struct bus_type *bus,
4883 struct rbd_device *rbd_dev = NULL;
4888 ret = strict_strtoul(buf, 10, &ul);
4892 /* convert to int; abort if we lost anything in the conversion */
4893 target_id = (int) ul;
4894 if (target_id != ul)
4897 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4899 rbd_dev = __rbd_get_dev(target_id);
4905 spin_lock_irq(&rbd_dev->lock);
4906 if (rbd_dev->open_count)
4909 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4910 spin_unlock_irq(&rbd_dev->lock);
4914 rbd_bus_del_dev(rbd_dev);
4915 rbd_dev_image_release(rbd_dev);
4916 module_put(THIS_MODULE);
4918 mutex_unlock(&ctl_mutex);
4924 * create control files in sysfs
4927 static int rbd_sysfs_init(void)
4931 ret = device_register(&rbd_root_dev);
4935 ret = bus_register(&rbd_bus_type);
4937 device_unregister(&rbd_root_dev);
4942 static void rbd_sysfs_cleanup(void)
4944 bus_unregister(&rbd_bus_type);
4945 device_unregister(&rbd_root_dev);
4948 static int rbd_slab_init(void)
4950 rbd_assert(!rbd_img_request_cache);
4951 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
4952 sizeof (struct rbd_img_request),
4953 __alignof__(struct rbd_img_request),
4955 if (!rbd_img_request_cache)
4958 rbd_assert(!rbd_obj_request_cache);
4959 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
4960 sizeof (struct rbd_obj_request),
4961 __alignof__(struct rbd_obj_request),
4963 if (!rbd_obj_request_cache)
4966 rbd_assert(!rbd_segment_name_cache);
4967 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
4968 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
4969 if (rbd_segment_name_cache)
4972 if (rbd_obj_request_cache) {
4973 kmem_cache_destroy(rbd_obj_request_cache);
4974 rbd_obj_request_cache = NULL;
4977 kmem_cache_destroy(rbd_img_request_cache);
4978 rbd_img_request_cache = NULL;
4983 static void rbd_slab_exit(void)
4985 rbd_assert(rbd_segment_name_cache);
4986 kmem_cache_destroy(rbd_segment_name_cache);
4987 rbd_segment_name_cache = NULL;
4989 rbd_assert(rbd_obj_request_cache);
4990 kmem_cache_destroy(rbd_obj_request_cache);
4991 rbd_obj_request_cache = NULL;
4993 rbd_assert(rbd_img_request_cache);
4994 kmem_cache_destroy(rbd_img_request_cache);
4995 rbd_img_request_cache = NULL;
4998 static int __init rbd_init(void)
5002 if (!libceph_compatible(NULL)) {
5003 rbd_warn(NULL, "libceph incompatibility (quitting)");
5007 rc = rbd_slab_init();
5010 rc = rbd_sysfs_init();
5014 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5019 static void __exit rbd_exit(void)
5021 rbd_sysfs_cleanup();
5025 module_init(rbd_init);
5026 module_exit(rbd_exit);
5028 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5029 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5030 MODULE_DESCRIPTION("rados block device");
5032 /* following authorship retained from original osdblk.c */
5033 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5035 MODULE_LICENSE("GPL");