3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
45 #include "rbd_types.h"
47 #define RBD_DEBUG /* Activate rbd_assert() calls */
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
55 #define SECTOR_SHIFT 9
56 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
59 * Increment the given counter and return its updated value.
60 * If the counter is already 0 it will not be incremented.
61 * If the counter is already at its maximum value returns
62 * -EINVAL without updating it.
64 static int atomic_inc_return_safe(atomic_t *v)
68 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
69 if (counter <= (unsigned int)INT_MAX)
77 /* Decrement the counter. Return the resulting value, or -EINVAL */
78 static int atomic_dec_return_safe(atomic_t *v)
82 counter = atomic_dec_return(v);
91 #define RBD_DRV_NAME "rbd"
93 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
95 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
96 #define RBD_MAX_SNAP_NAME_LEN \
97 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
99 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
101 #define RBD_SNAP_HEAD_NAME "-"
103 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
105 /* This allows a single page to hold an image name sent by OSD */
106 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
107 #define RBD_IMAGE_ID_LEN_MAX 64
109 #define RBD_OBJ_PREFIX_LEN_MAX 64
113 #define RBD_FEATURE_LAYERING (1<<0)
114 #define RBD_FEATURE_STRIPINGV2 (1<<1)
115 #define RBD_FEATURES_ALL \
116 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
118 /* Features supported by this (client software) implementation. */
120 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
123 * An RBD device name will be "rbd#", where the "rbd" comes from
124 * RBD_DRV_NAME above, and # is a unique integer identifier.
125 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
126 * enough to hold all possible device names.
128 #define DEV_NAME_LEN 32
129 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
132 * block device image metadata (in-memory version)
134 struct rbd_image_header {
135 /* These six fields never change for a given rbd image */
142 u64 features; /* Might be changeable someday? */
144 /* The remaining fields need to be updated occasionally */
146 struct ceph_snap_context *snapc;
147 char *snap_names; /* format 1 only */
148 u64 *snap_sizes; /* format 1 only */
152 * An rbd image specification.
154 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
155 * identify an image. Each rbd_dev structure includes a pointer to
156 * an rbd_spec structure that encapsulates this identity.
158 * Each of the id's in an rbd_spec has an associated name. For a
159 * user-mapped image, the names are supplied and the id's associated
160 * with them are looked up. For a layered image, a parent image is
161 * defined by the tuple, and the names are looked up.
163 * An rbd_dev structure contains a parent_spec pointer which is
164 * non-null if the image it represents is a child in a layered
165 * image. This pointer will refer to the rbd_spec structure used
166 * by the parent rbd_dev for its own identity (i.e., the structure
167 * is shared between the parent and child).
169 * Since these structures are populated once, during the discovery
170 * phase of image construction, they are effectively immutable so
171 * we make no effort to synchronize access to them.
173 * Note that code herein does not assume the image name is known (it
174 * could be a null pointer).
178 const char *pool_name;
180 const char *image_id;
181 const char *image_name;
184 const char *snap_name;
190 * an instance of the client. multiple devices may share an rbd client.
193 struct ceph_client *client;
195 struct list_head node;
198 struct rbd_img_request;
199 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
201 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
203 struct rbd_obj_request;
204 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
206 enum obj_request_type {
207 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
211 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
212 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
213 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
214 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
217 struct rbd_obj_request {
218 const char *object_name;
219 u64 offset; /* object start byte */
220 u64 length; /* bytes from offset */
224 * An object request associated with an image will have its
225 * img_data flag set; a standalone object request will not.
227 * A standalone object request will have which == BAD_WHICH
228 * and a null obj_request pointer.
230 * An object request initiated in support of a layered image
231 * object (to check for its existence before a write) will
232 * have which == BAD_WHICH and a non-null obj_request pointer.
234 * Finally, an object request for rbd image data will have
235 * which != BAD_WHICH, and will have a non-null img_request
236 * pointer. The value of which will be in the range
237 * 0..(img_request->obj_request_count-1).
240 struct rbd_obj_request *obj_request; /* STAT op */
242 struct rbd_img_request *img_request;
244 /* links for img_request->obj_requests list */
245 struct list_head links;
248 u32 which; /* posn image request list */
250 enum obj_request_type type;
252 struct bio *bio_list;
258 struct page **copyup_pages;
259 u32 copyup_page_count;
261 struct ceph_osd_request *osd_req;
263 u64 xferred; /* bytes transferred */
266 rbd_obj_callback_t callback;
267 struct completion completion;
273 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
274 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
275 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
278 struct rbd_img_request {
279 struct rbd_device *rbd_dev;
280 u64 offset; /* starting image byte offset */
281 u64 length; /* byte count from offset */
284 u64 snap_id; /* for reads */
285 struct ceph_snap_context *snapc; /* for writes */
288 struct request *rq; /* block request */
289 struct rbd_obj_request *obj_request; /* obj req initiator */
291 struct page **copyup_pages;
292 u32 copyup_page_count;
293 spinlock_t completion_lock;/* protects next_completion */
295 rbd_img_callback_t callback;
296 u64 xferred;/* aggregate bytes transferred */
297 int result; /* first nonzero obj_request result */
299 u32 obj_request_count;
300 struct list_head obj_requests; /* rbd_obj_request structs */
305 #define for_each_obj_request(ireq, oreq) \
306 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
307 #define for_each_obj_request_from(ireq, oreq) \
308 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
309 #define for_each_obj_request_safe(ireq, oreq, n) \
310 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
322 int dev_id; /* blkdev unique id */
324 int major; /* blkdev assigned major */
325 struct gendisk *disk; /* blkdev's gendisk and rq */
327 u32 image_format; /* Either 1 or 2 */
328 struct rbd_client *rbd_client;
330 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
332 spinlock_t lock; /* queue, flags, open_count */
334 struct rbd_image_header header;
335 unsigned long flags; /* possibly lock protected */
336 struct rbd_spec *spec;
340 struct ceph_file_layout layout;
342 struct ceph_osd_event *watch_event;
343 struct rbd_obj_request *watch_request;
345 struct rbd_spec *parent_spec;
348 struct rbd_device *parent;
350 /* protects updating the header */
351 struct rw_semaphore header_rwsem;
353 struct rbd_mapping mapping;
355 struct list_head node;
359 unsigned long open_count; /* protected by lock */
363 * Flag bits for rbd_dev->flags. If atomicity is required,
364 * rbd_dev->lock is used to protect access.
366 * Currently, only the "removing" flag (which is coupled with the
367 * "open_count" field) requires atomic access.
370 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
371 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
374 static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
376 static LIST_HEAD(rbd_dev_list); /* devices */
377 static DEFINE_SPINLOCK(rbd_dev_list_lock);
379 static LIST_HEAD(rbd_client_list); /* clients */
380 static DEFINE_SPINLOCK(rbd_client_list_lock);
382 /* Slab caches for frequently-allocated structures */
384 static struct kmem_cache *rbd_img_request_cache;
385 static struct kmem_cache *rbd_obj_request_cache;
386 static struct kmem_cache *rbd_segment_name_cache;
388 static int rbd_img_request_submit(struct rbd_img_request *img_request);
390 static void rbd_dev_device_release(struct device *dev);
392 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
394 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
396 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
397 static void rbd_spec_put(struct rbd_spec *spec);
399 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
400 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
402 static struct attribute *rbd_bus_attrs[] = {
404 &bus_attr_remove.attr,
407 ATTRIBUTE_GROUPS(rbd_bus);
409 static struct bus_type rbd_bus_type = {
411 .bus_groups = rbd_bus_groups,
414 static void rbd_root_dev_release(struct device *dev)
418 static struct device rbd_root_dev = {
420 .release = rbd_root_dev_release,
423 static __printf(2, 3)
424 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
426 struct va_format vaf;
434 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
435 else if (rbd_dev->disk)
436 printk(KERN_WARNING "%s: %s: %pV\n",
437 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
438 else if (rbd_dev->spec && rbd_dev->spec->image_name)
439 printk(KERN_WARNING "%s: image %s: %pV\n",
440 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
441 else if (rbd_dev->spec && rbd_dev->spec->image_id)
442 printk(KERN_WARNING "%s: id %s: %pV\n",
443 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
445 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
446 RBD_DRV_NAME, rbd_dev, &vaf);
451 #define rbd_assert(expr) \
452 if (unlikely(!(expr))) { \
453 printk(KERN_ERR "\nAssertion failure in %s() " \
455 "\trbd_assert(%s);\n\n", \
456 __func__, __LINE__, #expr); \
459 #else /* !RBD_DEBUG */
460 # define rbd_assert(expr) ((void) 0)
461 #endif /* !RBD_DEBUG */
463 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
464 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
465 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
467 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
468 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
469 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
470 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
472 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
473 u8 *order, u64 *snap_size);
474 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
476 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
478 static int rbd_open(struct block_device *bdev, fmode_t mode)
480 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
481 bool removing = false;
483 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
486 spin_lock_irq(&rbd_dev->lock);
487 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
490 rbd_dev->open_count++;
491 spin_unlock_irq(&rbd_dev->lock);
495 (void) get_device(&rbd_dev->dev);
496 set_device_ro(bdev, rbd_dev->mapping.read_only);
501 static void rbd_release(struct gendisk *disk, fmode_t mode)
503 struct rbd_device *rbd_dev = disk->private_data;
504 unsigned long open_count_before;
506 spin_lock_irq(&rbd_dev->lock);
507 open_count_before = rbd_dev->open_count--;
508 spin_unlock_irq(&rbd_dev->lock);
509 rbd_assert(open_count_before > 0);
511 put_device(&rbd_dev->dev);
514 static const struct block_device_operations rbd_bd_ops = {
515 .owner = THIS_MODULE,
517 .release = rbd_release,
521 * Initialize an rbd client instance. Success or not, this function
522 * consumes ceph_opts. Caller holds client_mutex.
524 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
526 struct rbd_client *rbdc;
529 dout("%s:\n", __func__);
530 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
534 kref_init(&rbdc->kref);
535 INIT_LIST_HEAD(&rbdc->node);
537 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
538 if (IS_ERR(rbdc->client))
540 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
542 ret = ceph_open_session(rbdc->client);
546 spin_lock(&rbd_client_list_lock);
547 list_add_tail(&rbdc->node, &rbd_client_list);
548 spin_unlock(&rbd_client_list_lock);
550 dout("%s: rbdc %p\n", __func__, rbdc);
554 ceph_destroy_client(rbdc->client);
559 ceph_destroy_options(ceph_opts);
560 dout("%s: error %d\n", __func__, ret);
565 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
567 kref_get(&rbdc->kref);
573 * Find a ceph client with specific addr and configuration. If
574 * found, bump its reference count.
576 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
578 struct rbd_client *client_node;
581 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
584 spin_lock(&rbd_client_list_lock);
585 list_for_each_entry(client_node, &rbd_client_list, node) {
586 if (!ceph_compare_options(ceph_opts, client_node->client)) {
587 __rbd_get_client(client_node);
593 spin_unlock(&rbd_client_list_lock);
595 return found ? client_node : NULL;
605 /* string args above */
608 /* Boolean args above */
612 static match_table_t rbd_opts_tokens = {
614 /* string args above */
615 {Opt_read_only, "read_only"},
616 {Opt_read_only, "ro"}, /* Alternate spelling */
617 {Opt_read_write, "read_write"},
618 {Opt_read_write, "rw"}, /* Alternate spelling */
619 /* Boolean args above */
627 #define RBD_READ_ONLY_DEFAULT false
629 static int parse_rbd_opts_token(char *c, void *private)
631 struct rbd_options *rbd_opts = private;
632 substring_t argstr[MAX_OPT_ARGS];
633 int token, intval, ret;
635 token = match_token(c, rbd_opts_tokens, argstr);
639 if (token < Opt_last_int) {
640 ret = match_int(&argstr[0], &intval);
642 pr_err("bad mount option arg (not int) "
646 dout("got int token %d val %d\n", token, intval);
647 } else if (token > Opt_last_int && token < Opt_last_string) {
648 dout("got string token %d val %s\n", token,
650 } else if (token > Opt_last_string && token < Opt_last_bool) {
651 dout("got Boolean token %d\n", token);
653 dout("got token %d\n", token);
658 rbd_opts->read_only = true;
661 rbd_opts->read_only = false;
671 * Get a ceph client with specific addr and configuration, if one does
672 * not exist create it. Either way, ceph_opts is consumed by this
675 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
677 struct rbd_client *rbdc;
679 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
680 rbdc = rbd_client_find(ceph_opts);
681 if (rbdc) /* using an existing client */
682 ceph_destroy_options(ceph_opts);
684 rbdc = rbd_client_create(ceph_opts);
685 mutex_unlock(&client_mutex);
691 * Destroy ceph client
693 * Caller must hold rbd_client_list_lock.
695 static void rbd_client_release(struct kref *kref)
697 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
699 dout("%s: rbdc %p\n", __func__, rbdc);
700 spin_lock(&rbd_client_list_lock);
701 list_del(&rbdc->node);
702 spin_unlock(&rbd_client_list_lock);
704 ceph_destroy_client(rbdc->client);
709 * Drop reference to ceph client node. If it's not referenced anymore, release
712 static void rbd_put_client(struct rbd_client *rbdc)
715 kref_put(&rbdc->kref, rbd_client_release);
718 static bool rbd_image_format_valid(u32 image_format)
720 return image_format == 1 || image_format == 2;
723 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
728 /* The header has to start with the magic rbd header text */
729 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
732 /* The bio layer requires at least sector-sized I/O */
734 if (ondisk->options.order < SECTOR_SHIFT)
737 /* If we use u64 in a few spots we may be able to loosen this */
739 if (ondisk->options.order > 8 * sizeof (int) - 1)
743 * The size of a snapshot header has to fit in a size_t, and
744 * that limits the number of snapshots.
746 snap_count = le32_to_cpu(ondisk->snap_count);
747 size = SIZE_MAX - sizeof (struct ceph_snap_context);
748 if (snap_count > size / sizeof (__le64))
752 * Not only that, but the size of the entire the snapshot
753 * header must also be representable in a size_t.
755 size -= snap_count * sizeof (__le64);
756 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
763 * Fill an rbd image header with information from the given format 1
766 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
767 struct rbd_image_header_ondisk *ondisk)
769 struct rbd_image_header *header = &rbd_dev->header;
770 bool first_time = header->object_prefix == NULL;
771 struct ceph_snap_context *snapc;
772 char *object_prefix = NULL;
773 char *snap_names = NULL;
774 u64 *snap_sizes = NULL;
780 /* Allocate this now to avoid having to handle failure below */
785 len = strnlen(ondisk->object_prefix,
786 sizeof (ondisk->object_prefix));
787 object_prefix = kmalloc(len + 1, GFP_KERNEL);
790 memcpy(object_prefix, ondisk->object_prefix, len);
791 object_prefix[len] = '\0';
794 /* Allocate the snapshot context and fill it in */
796 snap_count = le32_to_cpu(ondisk->snap_count);
797 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
800 snapc->seq = le64_to_cpu(ondisk->snap_seq);
802 struct rbd_image_snap_ondisk *snaps;
803 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
805 /* We'll keep a copy of the snapshot names... */
807 if (snap_names_len > (u64)SIZE_MAX)
809 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
813 /* ...as well as the array of their sizes. */
815 size = snap_count * sizeof (*header->snap_sizes);
816 snap_sizes = kmalloc(size, GFP_KERNEL);
821 * Copy the names, and fill in each snapshot's id
824 * Note that rbd_dev_v1_header_info() guarantees the
825 * ondisk buffer we're working with has
826 * snap_names_len bytes beyond the end of the
827 * snapshot id array, this memcpy() is safe.
829 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
830 snaps = ondisk->snaps;
831 for (i = 0; i < snap_count; i++) {
832 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
833 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
837 /* We won't fail any more, fill in the header */
840 header->object_prefix = object_prefix;
841 header->obj_order = ondisk->options.order;
842 header->crypt_type = ondisk->options.crypt_type;
843 header->comp_type = ondisk->options.comp_type;
844 /* The rest aren't used for format 1 images */
845 header->stripe_unit = 0;
846 header->stripe_count = 0;
847 header->features = 0;
849 ceph_put_snap_context(header->snapc);
850 kfree(header->snap_names);
851 kfree(header->snap_sizes);
854 /* The remaining fields always get updated (when we refresh) */
856 header->image_size = le64_to_cpu(ondisk->image_size);
857 header->snapc = snapc;
858 header->snap_names = snap_names;
859 header->snap_sizes = snap_sizes;
861 /* Make sure mapping size is consistent with header info */
863 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
864 if (rbd_dev->mapping.size != header->image_size)
865 rbd_dev->mapping.size = header->image_size;
873 ceph_put_snap_context(snapc);
874 kfree(object_prefix);
879 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
881 const char *snap_name;
883 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
885 /* Skip over names until we find the one we are looking for */
887 snap_name = rbd_dev->header.snap_names;
889 snap_name += strlen(snap_name) + 1;
891 return kstrdup(snap_name, GFP_KERNEL);
895 * Snapshot id comparison function for use with qsort()/bsearch().
896 * Note that result is for snapshots in *descending* order.
898 static int snapid_compare_reverse(const void *s1, const void *s2)
900 u64 snap_id1 = *(u64 *)s1;
901 u64 snap_id2 = *(u64 *)s2;
903 if (snap_id1 < snap_id2)
905 return snap_id1 == snap_id2 ? 0 : -1;
909 * Search a snapshot context to see if the given snapshot id is
912 * Returns the position of the snapshot id in the array if it's found,
913 * or BAD_SNAP_INDEX otherwise.
915 * Note: The snapshot array is in kept sorted (by the osd) in
916 * reverse order, highest snapshot id first.
918 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
920 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
923 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
924 sizeof (snap_id), snapid_compare_reverse);
926 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
929 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
933 const char *snap_name;
935 which = rbd_dev_snap_index(rbd_dev, snap_id);
936 if (which == BAD_SNAP_INDEX)
937 return ERR_PTR(-ENOENT);
939 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
940 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
943 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
945 if (snap_id == CEPH_NOSNAP)
946 return RBD_SNAP_HEAD_NAME;
948 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
949 if (rbd_dev->image_format == 1)
950 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
952 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
955 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
958 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
959 if (snap_id == CEPH_NOSNAP) {
960 *snap_size = rbd_dev->header.image_size;
961 } else if (rbd_dev->image_format == 1) {
964 which = rbd_dev_snap_index(rbd_dev, snap_id);
965 if (which == BAD_SNAP_INDEX)
968 *snap_size = rbd_dev->header.snap_sizes[which];
973 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
982 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
985 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
986 if (snap_id == CEPH_NOSNAP) {
987 *snap_features = rbd_dev->header.features;
988 } else if (rbd_dev->image_format == 1) {
989 *snap_features = 0; /* No features for format 1 */
994 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
998 *snap_features = features;
1003 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1005 u64 snap_id = rbd_dev->spec->snap_id;
1010 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1013 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1017 rbd_dev->mapping.size = size;
1018 rbd_dev->mapping.features = features;
1023 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1025 rbd_dev->mapping.size = 0;
1026 rbd_dev->mapping.features = 0;
1029 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1036 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1039 segment = offset >> rbd_dev->header.obj_order;
1040 name_format = "%s.%012llx";
1041 if (rbd_dev->image_format == 2)
1042 name_format = "%s.%016llx";
1043 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, name_format,
1044 rbd_dev->header.object_prefix, segment);
1045 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1046 pr_err("error formatting segment name for #%llu (%d)\n",
1055 static void rbd_segment_name_free(const char *name)
1057 /* The explicit cast here is needed to drop the const qualifier */
1059 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1062 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1064 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1066 return offset & (segment_size - 1);
1069 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1070 u64 offset, u64 length)
1072 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1074 offset &= segment_size - 1;
1076 rbd_assert(length <= U64_MAX - offset);
1077 if (offset + length > segment_size)
1078 length = segment_size - offset;
1084 * returns the size of an object in the image
1086 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1088 return 1 << header->obj_order;
1095 static void bio_chain_put(struct bio *chain)
1101 chain = chain->bi_next;
1107 * zeros a bio chain, starting at specific offset
1109 static void zero_bio_chain(struct bio *chain, int start_ofs)
1112 unsigned long flags;
1118 bio_for_each_segment(bv, chain, i) {
1119 if (pos + bv->bv_len > start_ofs) {
1120 int remainder = max(start_ofs - pos, 0);
1121 buf = bvec_kmap_irq(bv, &flags);
1122 memset(buf + remainder, 0,
1123 bv->bv_len - remainder);
1124 flush_dcache_page(bv->bv_page);
1125 bvec_kunmap_irq(buf, &flags);
1130 chain = chain->bi_next;
1135 * similar to zero_bio_chain(), zeros data defined by a page array,
1136 * starting at the given byte offset from the start of the array and
1137 * continuing up to the given end offset. The pages array is
1138 * assumed to be big enough to hold all bytes up to the end.
1140 static void zero_pages(struct page **pages, u64 offset, u64 end)
1142 struct page **page = &pages[offset >> PAGE_SHIFT];
1144 rbd_assert(end > offset);
1145 rbd_assert(end - offset <= (u64)SIZE_MAX);
1146 while (offset < end) {
1149 unsigned long flags;
1152 page_offset = offset & ~PAGE_MASK;
1153 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1154 local_irq_save(flags);
1155 kaddr = kmap_atomic(*page);
1156 memset(kaddr + page_offset, 0, length);
1157 flush_dcache_page(*page);
1158 kunmap_atomic(kaddr);
1159 local_irq_restore(flags);
1167 * Clone a portion of a bio, starting at the given byte offset
1168 * and continuing for the number of bytes indicated.
1170 static struct bio *bio_clone_range(struct bio *bio_src,
1171 unsigned int offset,
1179 unsigned short end_idx;
1180 unsigned short vcnt;
1183 /* Handle the easy case for the caller */
1185 if (!offset && len == bio_src->bi_size)
1186 return bio_clone(bio_src, gfpmask);
1188 if (WARN_ON_ONCE(!len))
1190 if (WARN_ON_ONCE(len > bio_src->bi_size))
1192 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1195 /* Find first affected segment... */
1198 bio_for_each_segment(bv, bio_src, idx) {
1199 if (resid < bv->bv_len)
1201 resid -= bv->bv_len;
1205 /* ...and the last affected segment */
1208 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1209 if (resid <= bv->bv_len)
1211 resid -= bv->bv_len;
1213 vcnt = end_idx - idx + 1;
1215 /* Build the clone */
1217 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1219 return NULL; /* ENOMEM */
1221 bio->bi_bdev = bio_src->bi_bdev;
1222 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1223 bio->bi_rw = bio_src->bi_rw;
1224 bio->bi_flags |= 1 << BIO_CLONED;
1227 * Copy over our part of the bio_vec, then update the first
1228 * and last (or only) entries.
1230 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1231 vcnt * sizeof (struct bio_vec));
1232 bio->bi_io_vec[0].bv_offset += voff;
1234 bio->bi_io_vec[0].bv_len -= voff;
1235 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1237 bio->bi_io_vec[0].bv_len = len;
1240 bio->bi_vcnt = vcnt;
1248 * Clone a portion of a bio chain, starting at the given byte offset
1249 * into the first bio in the source chain and continuing for the
1250 * number of bytes indicated. The result is another bio chain of
1251 * exactly the given length, or a null pointer on error.
1253 * The bio_src and offset parameters are both in-out. On entry they
1254 * refer to the first source bio and the offset into that bio where
1255 * the start of data to be cloned is located.
1257 * On return, bio_src is updated to refer to the bio in the source
1258 * chain that contains first un-cloned byte, and *offset will
1259 * contain the offset of that byte within that bio.
1261 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1262 unsigned int *offset,
1266 struct bio *bi = *bio_src;
1267 unsigned int off = *offset;
1268 struct bio *chain = NULL;
1271 /* Build up a chain of clone bios up to the limit */
1273 if (!bi || off >= bi->bi_size || !len)
1274 return NULL; /* Nothing to clone */
1278 unsigned int bi_size;
1282 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1283 goto out_err; /* EINVAL; ran out of bio's */
1285 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1286 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1288 goto out_err; /* ENOMEM */
1291 end = &bio->bi_next;
1294 if (off == bi->bi_size) {
1305 bio_chain_put(chain);
1311 * The default/initial value for all object request flags is 0. For
1312 * each flag, once its value is set to 1 it is never reset to 0
1315 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1317 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1318 struct rbd_device *rbd_dev;
1320 rbd_dev = obj_request->img_request->rbd_dev;
1321 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1326 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1329 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1332 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1334 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1335 struct rbd_device *rbd_dev = NULL;
1337 if (obj_request_img_data_test(obj_request))
1338 rbd_dev = obj_request->img_request->rbd_dev;
1339 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1344 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1347 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1351 * This sets the KNOWN flag after (possibly) setting the EXISTS
1352 * flag. The latter is set based on the "exists" value provided.
1354 * Note that for our purposes once an object exists it never goes
1355 * away again. It's possible that the response from two existence
1356 * checks are separated by the creation of the target object, and
1357 * the first ("doesn't exist") response arrives *after* the second
1358 * ("does exist"). In that case we ignore the second one.
1360 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1364 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1365 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1369 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1372 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1375 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1378 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1381 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1383 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1384 atomic_read(&obj_request->kref.refcount));
1385 kref_get(&obj_request->kref);
1388 static void rbd_obj_request_destroy(struct kref *kref);
1389 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1391 rbd_assert(obj_request != NULL);
1392 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1393 atomic_read(&obj_request->kref.refcount));
1394 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1397 static bool img_request_child_test(struct rbd_img_request *img_request);
1398 static void rbd_parent_request_destroy(struct kref *kref);
1399 static void rbd_img_request_destroy(struct kref *kref);
1400 static void rbd_img_request_put(struct rbd_img_request *img_request)
1402 rbd_assert(img_request != NULL);
1403 dout("%s: img %p (was %d)\n", __func__, img_request,
1404 atomic_read(&img_request->kref.refcount));
1405 if (img_request_child_test(img_request))
1406 kref_put(&img_request->kref, rbd_parent_request_destroy);
1408 kref_put(&img_request->kref, rbd_img_request_destroy);
1411 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1412 struct rbd_obj_request *obj_request)
1414 rbd_assert(obj_request->img_request == NULL);
1416 /* Image request now owns object's original reference */
1417 obj_request->img_request = img_request;
1418 obj_request->which = img_request->obj_request_count;
1419 rbd_assert(!obj_request_img_data_test(obj_request));
1420 obj_request_img_data_set(obj_request);
1421 rbd_assert(obj_request->which != BAD_WHICH);
1422 img_request->obj_request_count++;
1423 list_add_tail(&obj_request->links, &img_request->obj_requests);
1424 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1425 obj_request->which);
1428 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1429 struct rbd_obj_request *obj_request)
1431 rbd_assert(obj_request->which != BAD_WHICH);
1433 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1434 obj_request->which);
1435 list_del(&obj_request->links);
1436 rbd_assert(img_request->obj_request_count > 0);
1437 img_request->obj_request_count--;
1438 rbd_assert(obj_request->which == img_request->obj_request_count);
1439 obj_request->which = BAD_WHICH;
1440 rbd_assert(obj_request_img_data_test(obj_request));
1441 rbd_assert(obj_request->img_request == img_request);
1442 obj_request->img_request = NULL;
1443 obj_request->callback = NULL;
1444 rbd_obj_request_put(obj_request);
1447 static bool obj_request_type_valid(enum obj_request_type type)
1450 case OBJ_REQUEST_NODATA:
1451 case OBJ_REQUEST_BIO:
1452 case OBJ_REQUEST_PAGES:
1459 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1460 struct rbd_obj_request *obj_request)
1462 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1464 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1467 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1470 dout("%s: img %p\n", __func__, img_request);
1473 * If no error occurred, compute the aggregate transfer
1474 * count for the image request. We could instead use
1475 * atomic64_cmpxchg() to update it as each object request
1476 * completes; not clear which way is better off hand.
1478 if (!img_request->result) {
1479 struct rbd_obj_request *obj_request;
1482 for_each_obj_request(img_request, obj_request)
1483 xferred += obj_request->xferred;
1484 img_request->xferred = xferred;
1487 if (img_request->callback)
1488 img_request->callback(img_request);
1490 rbd_img_request_put(img_request);
1493 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1495 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1497 dout("%s: obj %p\n", __func__, obj_request);
1499 return wait_for_completion_interruptible(&obj_request->completion);
1503 * The default/initial value for all image request flags is 0. Each
1504 * is conditionally set to 1 at image request initialization time
1505 * and currently never change thereafter.
1507 static void img_request_write_set(struct rbd_img_request *img_request)
1509 set_bit(IMG_REQ_WRITE, &img_request->flags);
1513 static bool img_request_write_test(struct rbd_img_request *img_request)
1516 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1519 static void img_request_child_set(struct rbd_img_request *img_request)
1521 set_bit(IMG_REQ_CHILD, &img_request->flags);
1525 static void img_request_child_clear(struct rbd_img_request *img_request)
1527 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1531 static bool img_request_child_test(struct rbd_img_request *img_request)
1534 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1537 static void img_request_layered_set(struct rbd_img_request *img_request)
1539 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1543 static void img_request_layered_clear(struct rbd_img_request *img_request)
1545 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1549 static bool img_request_layered_test(struct rbd_img_request *img_request)
1552 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1556 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1558 u64 xferred = obj_request->xferred;
1559 u64 length = obj_request->length;
1561 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1562 obj_request, obj_request->img_request, obj_request->result,
1565 * ENOENT means a hole in the image. We zero-fill the entire
1566 * length of the request. A short read also implies zero-fill
1567 * to the end of the request. An error requires the whole
1568 * length of the request to be reported finished with an error
1569 * to the block layer. In each case we update the xferred
1570 * count to indicate the whole request was satisfied.
1572 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1573 if (obj_request->result == -ENOENT) {
1574 if (obj_request->type == OBJ_REQUEST_BIO)
1575 zero_bio_chain(obj_request->bio_list, 0);
1577 zero_pages(obj_request->pages, 0, length);
1578 obj_request->result = 0;
1579 } else if (xferred < length && !obj_request->result) {
1580 if (obj_request->type == OBJ_REQUEST_BIO)
1581 zero_bio_chain(obj_request->bio_list, xferred);
1583 zero_pages(obj_request->pages, xferred, length);
1585 obj_request->xferred = length;
1586 obj_request_done_set(obj_request);
1589 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1591 dout("%s: obj %p cb %p\n", __func__, obj_request,
1592 obj_request->callback);
1593 if (obj_request->callback)
1594 obj_request->callback(obj_request);
1596 complete_all(&obj_request->completion);
1599 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1601 dout("%s: obj %p\n", __func__, obj_request);
1602 obj_request_done_set(obj_request);
1605 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1607 struct rbd_img_request *img_request = NULL;
1608 struct rbd_device *rbd_dev = NULL;
1609 bool layered = false;
1611 if (obj_request_img_data_test(obj_request)) {
1612 img_request = obj_request->img_request;
1613 layered = img_request && img_request_layered_test(img_request);
1614 rbd_dev = img_request->rbd_dev;
1617 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1618 obj_request, img_request, obj_request->result,
1619 obj_request->xferred, obj_request->length);
1620 if (layered && obj_request->result == -ENOENT &&
1621 obj_request->img_offset < rbd_dev->parent_overlap)
1622 rbd_img_parent_read(obj_request);
1623 else if (img_request)
1624 rbd_img_obj_request_read_callback(obj_request);
1626 obj_request_done_set(obj_request);
1629 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1631 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1632 obj_request->result, obj_request->length);
1634 * There is no such thing as a successful short write. Set
1635 * it to our originally-requested length.
1637 obj_request->xferred = obj_request->length;
1638 obj_request_done_set(obj_request);
1642 * For a simple stat call there's nothing to do. We'll do more if
1643 * this is part of a write sequence for a layered image.
1645 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1647 dout("%s: obj %p\n", __func__, obj_request);
1648 obj_request_done_set(obj_request);
1651 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1652 struct ceph_msg *msg)
1654 struct rbd_obj_request *obj_request = osd_req->r_priv;
1657 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1658 rbd_assert(osd_req == obj_request->osd_req);
1659 if (obj_request_img_data_test(obj_request)) {
1660 rbd_assert(obj_request->img_request);
1661 rbd_assert(obj_request->which != BAD_WHICH);
1663 rbd_assert(obj_request->which == BAD_WHICH);
1666 if (osd_req->r_result < 0)
1667 obj_request->result = osd_req->r_result;
1669 BUG_ON(osd_req->r_num_ops > 2);
1672 * We support a 64-bit length, but ultimately it has to be
1673 * passed to blk_end_request(), which takes an unsigned int.
1675 obj_request->xferred = osd_req->r_reply_op_len[0];
1676 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1677 opcode = osd_req->r_ops[0].op;
1679 case CEPH_OSD_OP_READ:
1680 rbd_osd_read_callback(obj_request);
1682 case CEPH_OSD_OP_WRITE:
1683 rbd_osd_write_callback(obj_request);
1685 case CEPH_OSD_OP_STAT:
1686 rbd_osd_stat_callback(obj_request);
1688 case CEPH_OSD_OP_CALL:
1689 case CEPH_OSD_OP_NOTIFY_ACK:
1690 case CEPH_OSD_OP_WATCH:
1691 rbd_osd_trivial_callback(obj_request);
1694 rbd_warn(NULL, "%s: unsupported op %hu\n",
1695 obj_request->object_name, (unsigned short) opcode);
1699 if (obj_request_done_test(obj_request))
1700 rbd_obj_request_complete(obj_request);
1703 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1705 struct rbd_img_request *img_request = obj_request->img_request;
1706 struct ceph_osd_request *osd_req = obj_request->osd_req;
1709 rbd_assert(osd_req != NULL);
1711 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1712 ceph_osdc_build_request(osd_req, obj_request->offset,
1713 NULL, snap_id, NULL);
1716 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1718 struct rbd_img_request *img_request = obj_request->img_request;
1719 struct ceph_osd_request *osd_req = obj_request->osd_req;
1720 struct ceph_snap_context *snapc;
1721 struct timespec mtime = CURRENT_TIME;
1723 rbd_assert(osd_req != NULL);
1725 snapc = img_request ? img_request->snapc : NULL;
1726 ceph_osdc_build_request(osd_req, obj_request->offset,
1727 snapc, CEPH_NOSNAP, &mtime);
1730 static struct ceph_osd_request *rbd_osd_req_create(
1731 struct rbd_device *rbd_dev,
1733 struct rbd_obj_request *obj_request)
1735 struct ceph_snap_context *snapc = NULL;
1736 struct ceph_osd_client *osdc;
1737 struct ceph_osd_request *osd_req;
1739 if (obj_request_img_data_test(obj_request)) {
1740 struct rbd_img_request *img_request = obj_request->img_request;
1742 rbd_assert(write_request ==
1743 img_request_write_test(img_request));
1745 snapc = img_request->snapc;
1748 /* Allocate and initialize the request, for the single op */
1750 osdc = &rbd_dev->rbd_client->client->osdc;
1751 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1753 return NULL; /* ENOMEM */
1756 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1758 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1760 osd_req->r_callback = rbd_osd_req_callback;
1761 osd_req->r_priv = obj_request;
1763 osd_req->r_oid_len = strlen(obj_request->object_name);
1764 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1765 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1767 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1773 * Create a copyup osd request based on the information in the
1774 * object request supplied. A copyup request has two osd ops,
1775 * a copyup method call, and a "normal" write request.
1777 static struct ceph_osd_request *
1778 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1780 struct rbd_img_request *img_request;
1781 struct ceph_snap_context *snapc;
1782 struct rbd_device *rbd_dev;
1783 struct ceph_osd_client *osdc;
1784 struct ceph_osd_request *osd_req;
1786 rbd_assert(obj_request_img_data_test(obj_request));
1787 img_request = obj_request->img_request;
1788 rbd_assert(img_request);
1789 rbd_assert(img_request_write_test(img_request));
1791 /* Allocate and initialize the request, for the two ops */
1793 snapc = img_request->snapc;
1794 rbd_dev = img_request->rbd_dev;
1795 osdc = &rbd_dev->rbd_client->client->osdc;
1796 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1798 return NULL; /* ENOMEM */
1800 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1801 osd_req->r_callback = rbd_osd_req_callback;
1802 osd_req->r_priv = obj_request;
1804 osd_req->r_oid_len = strlen(obj_request->object_name);
1805 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1806 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1808 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1814 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1816 ceph_osdc_put_request(osd_req);
1819 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1821 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1822 u64 offset, u64 length,
1823 enum obj_request_type type)
1825 struct rbd_obj_request *obj_request;
1829 rbd_assert(obj_request_type_valid(type));
1831 size = strlen(object_name) + 1;
1832 name = kmalloc(size, GFP_KERNEL);
1836 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1842 obj_request->object_name = memcpy(name, object_name, size);
1843 obj_request->offset = offset;
1844 obj_request->length = length;
1845 obj_request->flags = 0;
1846 obj_request->which = BAD_WHICH;
1847 obj_request->type = type;
1848 INIT_LIST_HEAD(&obj_request->links);
1849 init_completion(&obj_request->completion);
1850 kref_init(&obj_request->kref);
1852 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1853 offset, length, (int)type, obj_request);
1858 static void rbd_obj_request_destroy(struct kref *kref)
1860 struct rbd_obj_request *obj_request;
1862 obj_request = container_of(kref, struct rbd_obj_request, kref);
1864 dout("%s: obj %p\n", __func__, obj_request);
1866 rbd_assert(obj_request->img_request == NULL);
1867 rbd_assert(obj_request->which == BAD_WHICH);
1869 if (obj_request->osd_req)
1870 rbd_osd_req_destroy(obj_request->osd_req);
1872 rbd_assert(obj_request_type_valid(obj_request->type));
1873 switch (obj_request->type) {
1874 case OBJ_REQUEST_NODATA:
1875 break; /* Nothing to do */
1876 case OBJ_REQUEST_BIO:
1877 if (obj_request->bio_list)
1878 bio_chain_put(obj_request->bio_list);
1880 case OBJ_REQUEST_PAGES:
1881 if (obj_request->pages)
1882 ceph_release_page_vector(obj_request->pages,
1883 obj_request->page_count);
1887 kfree(obj_request->object_name);
1888 obj_request->object_name = NULL;
1889 kmem_cache_free(rbd_obj_request_cache, obj_request);
1892 /* It's OK to call this for a device with no parent */
1894 static void rbd_spec_put(struct rbd_spec *spec);
1895 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1897 rbd_dev_remove_parent(rbd_dev);
1898 rbd_spec_put(rbd_dev->parent_spec);
1899 rbd_dev->parent_spec = NULL;
1900 rbd_dev->parent_overlap = 0;
1904 * Parent image reference counting is used to determine when an
1905 * image's parent fields can be safely torn down--after there are no
1906 * more in-flight requests to the parent image. When the last
1907 * reference is dropped, cleaning them up is safe.
1909 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1913 if (!rbd_dev->parent_spec)
1916 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1920 /* Last reference; clean up parent data structures */
1923 rbd_dev_unparent(rbd_dev);
1925 rbd_warn(rbd_dev, "parent reference underflow\n");
1929 * If an image has a non-zero parent overlap, get a reference to its
1932 * We must get the reference before checking for the overlap to
1933 * coordinate properly with zeroing the parent overlap in
1934 * rbd_dev_v2_parent_info() when an image gets flattened. We
1935 * drop it again if there is no overlap.
1937 * Returns true if the rbd device has a parent with a non-zero
1938 * overlap and a reference for it was successfully taken, or
1941 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1945 if (!rbd_dev->parent_spec)
1948 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1949 if (counter > 0 && rbd_dev->parent_overlap)
1952 /* Image was flattened, but parent is not yet torn down */
1955 rbd_warn(rbd_dev, "parent reference overflow\n");
1961 * Caller is responsible for filling in the list of object requests
1962 * that comprises the image request, and the Linux request pointer
1963 * (if there is one).
1965 static struct rbd_img_request *rbd_img_request_create(
1966 struct rbd_device *rbd_dev,
1967 u64 offset, u64 length,
1970 struct rbd_img_request *img_request;
1972 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1976 if (write_request) {
1977 down_read(&rbd_dev->header_rwsem);
1978 ceph_get_snap_context(rbd_dev->header.snapc);
1979 up_read(&rbd_dev->header_rwsem);
1982 img_request->rq = NULL;
1983 img_request->rbd_dev = rbd_dev;
1984 img_request->offset = offset;
1985 img_request->length = length;
1986 img_request->flags = 0;
1987 if (write_request) {
1988 img_request_write_set(img_request);
1989 img_request->snapc = rbd_dev->header.snapc;
1991 img_request->snap_id = rbd_dev->spec->snap_id;
1993 if (rbd_dev_parent_get(rbd_dev))
1994 img_request_layered_set(img_request);
1995 spin_lock_init(&img_request->completion_lock);
1996 img_request->next_completion = 0;
1997 img_request->callback = NULL;
1998 img_request->result = 0;
1999 img_request->obj_request_count = 0;
2000 INIT_LIST_HEAD(&img_request->obj_requests);
2001 kref_init(&img_request->kref);
2003 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2004 write_request ? "write" : "read", offset, length,
2010 static void rbd_img_request_destroy(struct kref *kref)
2012 struct rbd_img_request *img_request;
2013 struct rbd_obj_request *obj_request;
2014 struct rbd_obj_request *next_obj_request;
2016 img_request = container_of(kref, struct rbd_img_request, kref);
2018 dout("%s: img %p\n", __func__, img_request);
2020 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2021 rbd_img_obj_request_del(img_request, obj_request);
2022 rbd_assert(img_request->obj_request_count == 0);
2024 if (img_request_layered_test(img_request)) {
2025 img_request_layered_clear(img_request);
2026 rbd_dev_parent_put(img_request->rbd_dev);
2029 if (img_request_write_test(img_request))
2030 ceph_put_snap_context(img_request->snapc);
2032 kmem_cache_free(rbd_img_request_cache, img_request);
2035 static struct rbd_img_request *rbd_parent_request_create(
2036 struct rbd_obj_request *obj_request,
2037 u64 img_offset, u64 length)
2039 struct rbd_img_request *parent_request;
2040 struct rbd_device *rbd_dev;
2042 rbd_assert(obj_request->img_request);
2043 rbd_dev = obj_request->img_request->rbd_dev;
2045 parent_request = rbd_img_request_create(rbd_dev->parent,
2046 img_offset, length, false);
2047 if (!parent_request)
2050 img_request_child_set(parent_request);
2051 rbd_obj_request_get(obj_request);
2052 parent_request->obj_request = obj_request;
2054 return parent_request;
2057 static void rbd_parent_request_destroy(struct kref *kref)
2059 struct rbd_img_request *parent_request;
2060 struct rbd_obj_request *orig_request;
2062 parent_request = container_of(kref, struct rbd_img_request, kref);
2063 orig_request = parent_request->obj_request;
2065 parent_request->obj_request = NULL;
2066 rbd_obj_request_put(orig_request);
2067 img_request_child_clear(parent_request);
2069 rbd_img_request_destroy(kref);
2072 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2074 struct rbd_img_request *img_request;
2075 unsigned int xferred;
2079 rbd_assert(obj_request_img_data_test(obj_request));
2080 img_request = obj_request->img_request;
2082 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2083 xferred = (unsigned int)obj_request->xferred;
2084 result = obj_request->result;
2086 struct rbd_device *rbd_dev = img_request->rbd_dev;
2088 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2089 img_request_write_test(img_request) ? "write" : "read",
2090 obj_request->length, obj_request->img_offset,
2091 obj_request->offset);
2092 rbd_warn(rbd_dev, " result %d xferred %x\n",
2094 if (!img_request->result)
2095 img_request->result = result;
2098 /* Image object requests don't own their page array */
2100 if (obj_request->type == OBJ_REQUEST_PAGES) {
2101 obj_request->pages = NULL;
2102 obj_request->page_count = 0;
2105 if (img_request_child_test(img_request)) {
2106 rbd_assert(img_request->obj_request != NULL);
2107 more = obj_request->which < img_request->obj_request_count - 1;
2109 rbd_assert(img_request->rq != NULL);
2110 more = blk_end_request(img_request->rq, result, xferred);
2116 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2118 struct rbd_img_request *img_request;
2119 u32 which = obj_request->which;
2122 rbd_assert(obj_request_img_data_test(obj_request));
2123 img_request = obj_request->img_request;
2125 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2126 rbd_assert(img_request != NULL);
2127 rbd_assert(img_request->obj_request_count > 0);
2128 rbd_assert(which != BAD_WHICH);
2129 rbd_assert(which < img_request->obj_request_count);
2130 rbd_assert(which >= img_request->next_completion);
2132 spin_lock_irq(&img_request->completion_lock);
2133 if (which != img_request->next_completion)
2136 for_each_obj_request_from(img_request, obj_request) {
2138 rbd_assert(which < img_request->obj_request_count);
2140 if (!obj_request_done_test(obj_request))
2142 more = rbd_img_obj_end_request(obj_request);
2146 rbd_assert(more ^ (which == img_request->obj_request_count));
2147 img_request->next_completion = which;
2149 spin_unlock_irq(&img_request->completion_lock);
2152 rbd_img_request_complete(img_request);
2156 * Split up an image request into one or more object requests, each
2157 * to a different object. The "type" parameter indicates whether
2158 * "data_desc" is the pointer to the head of a list of bio
2159 * structures, or the base of a page array. In either case this
2160 * function assumes data_desc describes memory sufficient to hold
2161 * all data described by the image request.
2163 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2164 enum obj_request_type type,
2167 struct rbd_device *rbd_dev = img_request->rbd_dev;
2168 struct rbd_obj_request *obj_request = NULL;
2169 struct rbd_obj_request *next_obj_request;
2170 bool write_request = img_request_write_test(img_request);
2171 struct bio *bio_list = NULL;
2172 unsigned int bio_offset = 0;
2173 struct page **pages = NULL;
2178 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2179 (int)type, data_desc);
2181 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2182 img_offset = img_request->offset;
2183 resid = img_request->length;
2184 rbd_assert(resid > 0);
2186 if (type == OBJ_REQUEST_BIO) {
2187 bio_list = data_desc;
2188 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2190 rbd_assert(type == OBJ_REQUEST_PAGES);
2195 struct ceph_osd_request *osd_req;
2196 const char *object_name;
2200 object_name = rbd_segment_name(rbd_dev, img_offset);
2203 offset = rbd_segment_offset(rbd_dev, img_offset);
2204 length = rbd_segment_length(rbd_dev, img_offset, resid);
2205 obj_request = rbd_obj_request_create(object_name,
2206 offset, length, type);
2207 /* object request has its own copy of the object name */
2208 rbd_segment_name_free(object_name);
2212 * set obj_request->img_request before creating the
2213 * osd_request so that it gets the right snapc
2215 rbd_img_obj_request_add(img_request, obj_request);
2217 if (type == OBJ_REQUEST_BIO) {
2218 unsigned int clone_size;
2220 rbd_assert(length <= (u64)UINT_MAX);
2221 clone_size = (unsigned int)length;
2222 obj_request->bio_list =
2223 bio_chain_clone_range(&bio_list,
2227 if (!obj_request->bio_list)
2230 unsigned int page_count;
2232 obj_request->pages = pages;
2233 page_count = (u32)calc_pages_for(offset, length);
2234 obj_request->page_count = page_count;
2235 if ((offset + length) & ~PAGE_MASK)
2236 page_count--; /* more on last page */
2237 pages += page_count;
2240 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2244 obj_request->osd_req = osd_req;
2245 obj_request->callback = rbd_img_obj_callback;
2247 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2249 if (type == OBJ_REQUEST_BIO)
2250 osd_req_op_extent_osd_data_bio(osd_req, 0,
2251 obj_request->bio_list, length);
2253 osd_req_op_extent_osd_data_pages(osd_req, 0,
2254 obj_request->pages, length,
2255 offset & ~PAGE_MASK, false, false);
2258 rbd_osd_req_format_write(obj_request);
2260 rbd_osd_req_format_read(obj_request);
2262 obj_request->img_offset = img_offset;
2264 img_offset += length;
2271 rbd_obj_request_put(obj_request);
2273 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2274 rbd_obj_request_put(obj_request);
2280 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2282 struct rbd_img_request *img_request;
2283 struct rbd_device *rbd_dev;
2284 struct page **pages;
2287 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2288 rbd_assert(obj_request_img_data_test(obj_request));
2289 img_request = obj_request->img_request;
2290 rbd_assert(img_request);
2292 rbd_dev = img_request->rbd_dev;
2293 rbd_assert(rbd_dev);
2295 pages = obj_request->copyup_pages;
2296 rbd_assert(pages != NULL);
2297 obj_request->copyup_pages = NULL;
2298 page_count = obj_request->copyup_page_count;
2299 rbd_assert(page_count);
2300 obj_request->copyup_page_count = 0;
2301 ceph_release_page_vector(pages, page_count);
2304 * We want the transfer count to reflect the size of the
2305 * original write request. There is no such thing as a
2306 * successful short write, so if the request was successful
2307 * we can just set it to the originally-requested length.
2309 if (!obj_request->result)
2310 obj_request->xferred = obj_request->length;
2312 /* Finish up with the normal image object callback */
2314 rbd_img_obj_callback(obj_request);
2318 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2320 struct rbd_obj_request *orig_request;
2321 struct ceph_osd_request *osd_req;
2322 struct ceph_osd_client *osdc;
2323 struct rbd_device *rbd_dev;
2324 struct page **pages;
2331 rbd_assert(img_request_child_test(img_request));
2333 /* First get what we need from the image request */
2335 pages = img_request->copyup_pages;
2336 rbd_assert(pages != NULL);
2337 img_request->copyup_pages = NULL;
2338 page_count = img_request->copyup_page_count;
2339 rbd_assert(page_count);
2340 img_request->copyup_page_count = 0;
2342 orig_request = img_request->obj_request;
2343 rbd_assert(orig_request != NULL);
2344 rbd_assert(obj_request_type_valid(orig_request->type));
2345 img_result = img_request->result;
2346 parent_length = img_request->length;
2347 rbd_assert(parent_length == img_request->xferred);
2348 rbd_img_request_put(img_request);
2350 rbd_assert(orig_request->img_request);
2351 rbd_dev = orig_request->img_request->rbd_dev;
2352 rbd_assert(rbd_dev);
2355 * If the overlap has become 0 (most likely because the
2356 * image has been flattened) we need to free the pages
2357 * and re-submit the original write request.
2359 if (!rbd_dev->parent_overlap) {
2360 struct ceph_osd_client *osdc;
2362 ceph_release_page_vector(pages, page_count);
2363 osdc = &rbd_dev->rbd_client->client->osdc;
2364 img_result = rbd_obj_request_submit(osdc, orig_request);
2373 * The original osd request is of no use to use any more.
2374 * We need a new one that can hold the two ops in a copyup
2375 * request. Allocate the new copyup osd request for the
2376 * original request, and release the old one.
2378 img_result = -ENOMEM;
2379 osd_req = rbd_osd_req_create_copyup(orig_request);
2382 rbd_osd_req_destroy(orig_request->osd_req);
2383 orig_request->osd_req = osd_req;
2384 orig_request->copyup_pages = pages;
2385 orig_request->copyup_page_count = page_count;
2387 /* Initialize the copyup op */
2389 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2390 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2393 /* Then the original write request op */
2395 offset = orig_request->offset;
2396 length = orig_request->length;
2397 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2398 offset, length, 0, 0);
2399 if (orig_request->type == OBJ_REQUEST_BIO)
2400 osd_req_op_extent_osd_data_bio(osd_req, 1,
2401 orig_request->bio_list, length);
2403 osd_req_op_extent_osd_data_pages(osd_req, 1,
2404 orig_request->pages, length,
2405 offset & ~PAGE_MASK, false, false);
2407 rbd_osd_req_format_write(orig_request);
2409 /* All set, send it off. */
2411 orig_request->callback = rbd_img_obj_copyup_callback;
2412 osdc = &rbd_dev->rbd_client->client->osdc;
2413 img_result = rbd_obj_request_submit(osdc, orig_request);
2417 /* Record the error code and complete the request */
2419 orig_request->result = img_result;
2420 orig_request->xferred = 0;
2421 obj_request_done_set(orig_request);
2422 rbd_obj_request_complete(orig_request);
2426 * Read from the parent image the range of data that covers the
2427 * entire target of the given object request. This is used for
2428 * satisfying a layered image write request when the target of an
2429 * object request from the image request does not exist.
2431 * A page array big enough to hold the returned data is allocated
2432 * and supplied to rbd_img_request_fill() as the "data descriptor."
2433 * When the read completes, this page array will be transferred to
2434 * the original object request for the copyup operation.
2436 * If an error occurs, record it as the result of the original
2437 * object request and mark it done so it gets completed.
2439 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2441 struct rbd_img_request *img_request = NULL;
2442 struct rbd_img_request *parent_request = NULL;
2443 struct rbd_device *rbd_dev;
2446 struct page **pages = NULL;
2450 rbd_assert(obj_request_img_data_test(obj_request));
2451 rbd_assert(obj_request_type_valid(obj_request->type));
2453 img_request = obj_request->img_request;
2454 rbd_assert(img_request != NULL);
2455 rbd_dev = img_request->rbd_dev;
2456 rbd_assert(rbd_dev->parent != NULL);
2459 * Determine the byte range covered by the object in the
2460 * child image to which the original request was to be sent.
2462 img_offset = obj_request->img_offset - obj_request->offset;
2463 length = (u64)1 << rbd_dev->header.obj_order;
2466 * There is no defined parent data beyond the parent
2467 * overlap, so limit what we read at that boundary if
2470 if (img_offset + length > rbd_dev->parent_overlap) {
2471 rbd_assert(img_offset < rbd_dev->parent_overlap);
2472 length = rbd_dev->parent_overlap - img_offset;
2476 * Allocate a page array big enough to receive the data read
2479 page_count = (u32)calc_pages_for(0, length);
2480 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2481 if (IS_ERR(pages)) {
2482 result = PTR_ERR(pages);
2488 parent_request = rbd_parent_request_create(obj_request,
2489 img_offset, length);
2490 if (!parent_request)
2493 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2496 parent_request->copyup_pages = pages;
2497 parent_request->copyup_page_count = page_count;
2499 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2500 result = rbd_img_request_submit(parent_request);
2504 parent_request->copyup_pages = NULL;
2505 parent_request->copyup_page_count = 0;
2506 parent_request->obj_request = NULL;
2507 rbd_obj_request_put(obj_request);
2510 ceph_release_page_vector(pages, page_count);
2512 rbd_img_request_put(parent_request);
2513 obj_request->result = result;
2514 obj_request->xferred = 0;
2515 obj_request_done_set(obj_request);
2520 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2522 struct rbd_obj_request *orig_request;
2523 struct rbd_device *rbd_dev;
2526 rbd_assert(!obj_request_img_data_test(obj_request));
2529 * All we need from the object request is the original
2530 * request and the result of the STAT op. Grab those, then
2531 * we're done with the request.
2533 orig_request = obj_request->obj_request;
2534 obj_request->obj_request = NULL;
2535 rbd_obj_request_put(orig_request);
2536 rbd_assert(orig_request);
2537 rbd_assert(orig_request->img_request);
2539 result = obj_request->result;
2540 obj_request->result = 0;
2542 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2543 obj_request, orig_request, result,
2544 obj_request->xferred, obj_request->length);
2545 rbd_obj_request_put(obj_request);
2548 * If the overlap has become 0 (most likely because the
2549 * image has been flattened) we need to free the pages
2550 * and re-submit the original write request.
2552 rbd_dev = orig_request->img_request->rbd_dev;
2553 if (!rbd_dev->parent_overlap) {
2554 struct ceph_osd_client *osdc;
2556 osdc = &rbd_dev->rbd_client->client->osdc;
2557 result = rbd_obj_request_submit(osdc, orig_request);
2563 * Our only purpose here is to determine whether the object
2564 * exists, and we don't want to treat the non-existence as
2565 * an error. If something else comes back, transfer the
2566 * error to the original request and complete it now.
2569 obj_request_existence_set(orig_request, true);
2570 } else if (result == -ENOENT) {
2571 obj_request_existence_set(orig_request, false);
2572 } else if (result) {
2573 orig_request->result = result;
2578 * Resubmit the original request now that we have recorded
2579 * whether the target object exists.
2581 orig_request->result = rbd_img_obj_request_submit(orig_request);
2583 if (orig_request->result)
2584 rbd_obj_request_complete(orig_request);
2587 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2589 struct rbd_obj_request *stat_request;
2590 struct rbd_device *rbd_dev;
2591 struct ceph_osd_client *osdc;
2592 struct page **pages = NULL;
2598 * The response data for a STAT call consists of:
2605 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2606 page_count = (u32)calc_pages_for(0, size);
2607 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2609 return PTR_ERR(pages);
2612 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2617 rbd_obj_request_get(obj_request);
2618 stat_request->obj_request = obj_request;
2619 stat_request->pages = pages;
2620 stat_request->page_count = page_count;
2622 rbd_assert(obj_request->img_request);
2623 rbd_dev = obj_request->img_request->rbd_dev;
2624 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2626 if (!stat_request->osd_req)
2628 stat_request->callback = rbd_img_obj_exists_callback;
2630 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2631 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2633 rbd_osd_req_format_read(stat_request);
2635 osdc = &rbd_dev->rbd_client->client->osdc;
2636 ret = rbd_obj_request_submit(osdc, stat_request);
2639 rbd_obj_request_put(obj_request);
2644 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2646 struct rbd_img_request *img_request;
2647 struct rbd_device *rbd_dev;
2650 rbd_assert(obj_request_img_data_test(obj_request));
2652 img_request = obj_request->img_request;
2653 rbd_assert(img_request);
2654 rbd_dev = img_request->rbd_dev;
2657 * Only writes to layered images need special handling.
2658 * Reads and non-layered writes are simple object requests.
2659 * Layered writes that start beyond the end of the overlap
2660 * with the parent have no parent data, so they too are
2661 * simple object requests. Finally, if the target object is
2662 * known to already exist, its parent data has already been
2663 * copied, so a write to the object can also be handled as a
2664 * simple object request.
2666 if (!img_request_write_test(img_request) ||
2667 !img_request_layered_test(img_request) ||
2668 rbd_dev->parent_overlap <= obj_request->img_offset ||
2669 ((known = obj_request_known_test(obj_request)) &&
2670 obj_request_exists_test(obj_request))) {
2672 struct rbd_device *rbd_dev;
2673 struct ceph_osd_client *osdc;
2675 rbd_dev = obj_request->img_request->rbd_dev;
2676 osdc = &rbd_dev->rbd_client->client->osdc;
2678 return rbd_obj_request_submit(osdc, obj_request);
2682 * It's a layered write. The target object might exist but
2683 * we may not know that yet. If we know it doesn't exist,
2684 * start by reading the data for the full target object from
2685 * the parent so we can use it for a copyup to the target.
2688 return rbd_img_obj_parent_read_full(obj_request);
2690 /* We don't know whether the target exists. Go find out. */
2692 return rbd_img_obj_exists_submit(obj_request);
2695 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2697 struct rbd_obj_request *obj_request;
2698 struct rbd_obj_request *next_obj_request;
2700 dout("%s: img %p\n", __func__, img_request);
2701 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2704 ret = rbd_img_obj_request_submit(obj_request);
2712 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2714 struct rbd_obj_request *obj_request;
2715 struct rbd_device *rbd_dev;
2720 rbd_assert(img_request_child_test(img_request));
2722 /* First get what we need from the image request and release it */
2724 obj_request = img_request->obj_request;
2725 img_xferred = img_request->xferred;
2726 img_result = img_request->result;
2727 rbd_img_request_put(img_request);
2730 * If the overlap has become 0 (most likely because the
2731 * image has been flattened) we need to re-submit the
2734 rbd_assert(obj_request);
2735 rbd_assert(obj_request->img_request);
2736 rbd_dev = obj_request->img_request->rbd_dev;
2737 if (!rbd_dev->parent_overlap) {
2738 struct ceph_osd_client *osdc;
2740 osdc = &rbd_dev->rbd_client->client->osdc;
2741 img_result = rbd_obj_request_submit(osdc, obj_request);
2746 obj_request->result = img_result;
2747 if (obj_request->result)
2751 * We need to zero anything beyond the parent overlap
2752 * boundary. Since rbd_img_obj_request_read_callback()
2753 * will zero anything beyond the end of a short read, an
2754 * easy way to do this is to pretend the data from the
2755 * parent came up short--ending at the overlap boundary.
2757 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2758 obj_end = obj_request->img_offset + obj_request->length;
2759 if (obj_end > rbd_dev->parent_overlap) {
2762 if (obj_request->img_offset < rbd_dev->parent_overlap)
2763 xferred = rbd_dev->parent_overlap -
2764 obj_request->img_offset;
2766 obj_request->xferred = min(img_xferred, xferred);
2768 obj_request->xferred = img_xferred;
2771 rbd_img_obj_request_read_callback(obj_request);
2772 rbd_obj_request_complete(obj_request);
2775 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2777 struct rbd_img_request *img_request;
2780 rbd_assert(obj_request_img_data_test(obj_request));
2781 rbd_assert(obj_request->img_request != NULL);
2782 rbd_assert(obj_request->result == (s32) -ENOENT);
2783 rbd_assert(obj_request_type_valid(obj_request->type));
2785 /* rbd_read_finish(obj_request, obj_request->length); */
2786 img_request = rbd_parent_request_create(obj_request,
2787 obj_request->img_offset,
2788 obj_request->length);
2793 if (obj_request->type == OBJ_REQUEST_BIO)
2794 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2795 obj_request->bio_list);
2797 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2798 obj_request->pages);
2802 img_request->callback = rbd_img_parent_read_callback;
2803 result = rbd_img_request_submit(img_request);
2810 rbd_img_request_put(img_request);
2811 obj_request->result = result;
2812 obj_request->xferred = 0;
2813 obj_request_done_set(obj_request);
2816 static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
2818 struct rbd_obj_request *obj_request;
2819 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2822 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2823 OBJ_REQUEST_NODATA);
2828 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2829 if (!obj_request->osd_req)
2832 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2834 rbd_osd_req_format_read(obj_request);
2836 ret = rbd_obj_request_submit(osdc, obj_request);
2839 ret = rbd_obj_request_wait(obj_request);
2841 rbd_obj_request_put(obj_request);
2846 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2848 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2854 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2855 rbd_dev->header_name, (unsigned long long)notify_id,
2856 (unsigned int)opcode);
2857 ret = rbd_dev_refresh(rbd_dev);
2859 rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
2861 rbd_obj_notify_ack_sync(rbd_dev, notify_id);
2865 * Request sync osd watch/unwatch. The value of "start" determines
2866 * whether a watch request is being initiated or torn down.
2868 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2870 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2871 struct rbd_obj_request *obj_request;
2874 rbd_assert(start ^ !!rbd_dev->watch_event);
2875 rbd_assert(start ^ !!rbd_dev->watch_request);
2878 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2879 &rbd_dev->watch_event);
2882 rbd_assert(rbd_dev->watch_event != NULL);
2886 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2887 OBJ_REQUEST_NODATA);
2891 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2892 if (!obj_request->osd_req)
2896 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2898 ceph_osdc_unregister_linger_request(osdc,
2899 rbd_dev->watch_request->osd_req);
2901 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2902 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
2903 rbd_osd_req_format_write(obj_request);
2905 ret = rbd_obj_request_submit(osdc, obj_request);
2908 ret = rbd_obj_request_wait(obj_request);
2911 ret = obj_request->result;
2916 * A watch request is set to linger, so the underlying osd
2917 * request won't go away until we unregister it. We retain
2918 * a pointer to the object request during that time (in
2919 * rbd_dev->watch_request), so we'll keep a reference to
2920 * it. We'll drop that reference (below) after we've
2924 rbd_dev->watch_request = obj_request;
2929 /* We have successfully torn down the watch request */
2931 rbd_obj_request_put(rbd_dev->watch_request);
2932 rbd_dev->watch_request = NULL;
2934 /* Cancel the event if we're tearing down, or on error */
2935 ceph_osdc_cancel_event(rbd_dev->watch_event);
2936 rbd_dev->watch_event = NULL;
2938 rbd_obj_request_put(obj_request);
2944 * Synchronous osd object method call. Returns the number of bytes
2945 * returned in the outbound buffer, or a negative error code.
2947 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2948 const char *object_name,
2949 const char *class_name,
2950 const char *method_name,
2951 const void *outbound,
2952 size_t outbound_size,
2954 size_t inbound_size)
2956 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2957 struct rbd_obj_request *obj_request;
2958 struct page **pages;
2963 * Method calls are ultimately read operations. The result
2964 * should placed into the inbound buffer provided. They
2965 * also supply outbound data--parameters for the object
2966 * method. Currently if this is present it will be a
2969 page_count = (u32)calc_pages_for(0, inbound_size);
2970 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2972 return PTR_ERR(pages);
2975 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2980 obj_request->pages = pages;
2981 obj_request->page_count = page_count;
2983 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2984 if (!obj_request->osd_req)
2987 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2988 class_name, method_name);
2989 if (outbound_size) {
2990 struct ceph_pagelist *pagelist;
2992 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2996 ceph_pagelist_init(pagelist);
2997 ceph_pagelist_append(pagelist, outbound, outbound_size);
2998 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3001 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3002 obj_request->pages, inbound_size,
3004 rbd_osd_req_format_read(obj_request);
3006 ret = rbd_obj_request_submit(osdc, obj_request);
3009 ret = rbd_obj_request_wait(obj_request);
3013 ret = obj_request->result;
3017 rbd_assert(obj_request->xferred < (u64)INT_MAX);
3018 ret = (int)obj_request->xferred;
3019 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
3022 rbd_obj_request_put(obj_request);
3024 ceph_release_page_vector(pages, page_count);
3029 static void rbd_request_fn(struct request_queue *q)
3030 __releases(q->queue_lock) __acquires(q->queue_lock)
3032 struct rbd_device *rbd_dev = q->queuedata;
3033 bool read_only = rbd_dev->mapping.read_only;
3037 while ((rq = blk_fetch_request(q))) {
3038 bool write_request = rq_data_dir(rq) == WRITE;
3039 struct rbd_img_request *img_request;
3043 /* Ignore any non-FS requests that filter through. */
3045 if (rq->cmd_type != REQ_TYPE_FS) {
3046 dout("%s: non-fs request type %d\n", __func__,
3047 (int) rq->cmd_type);
3048 __blk_end_request_all(rq, 0);
3052 /* Ignore/skip any zero-length requests */
3054 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3055 length = (u64) blk_rq_bytes(rq);
3058 dout("%s: zero-length request\n", __func__);
3059 __blk_end_request_all(rq, 0);
3063 spin_unlock_irq(q->queue_lock);
3065 /* Disallow writes to a read-only device */
3067 if (write_request) {
3071 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3075 * Quit early if the mapped snapshot no longer
3076 * exists. It's still possible the snapshot will
3077 * have disappeared by the time our request arrives
3078 * at the osd, but there's no sense in sending it if
3081 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3082 dout("request for non-existent snapshot");
3083 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3089 if (offset && length > U64_MAX - offset + 1) {
3090 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3092 goto end_request; /* Shouldn't happen */
3096 if (offset + length > rbd_dev->mapping.size) {
3097 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3098 offset, length, rbd_dev->mapping.size);
3103 img_request = rbd_img_request_create(rbd_dev, offset, length,
3108 img_request->rq = rq;
3110 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3113 result = rbd_img_request_submit(img_request);
3115 rbd_img_request_put(img_request);
3117 spin_lock_irq(q->queue_lock);
3119 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3120 write_request ? "write" : "read",
3121 length, offset, result);
3123 __blk_end_request_all(rq, result);
3129 * a queue callback. Makes sure that we don't create a bio that spans across
3130 * multiple osd objects. One exception would be with a single page bios,
3131 * which we handle later at bio_chain_clone_range()
3133 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3134 struct bio_vec *bvec)
3136 struct rbd_device *rbd_dev = q->queuedata;
3137 sector_t sector_offset;
3138 sector_t sectors_per_obj;
3139 sector_t obj_sector_offset;
3143 * Find how far into its rbd object the partition-relative
3144 * bio start sector is to offset relative to the enclosing
3147 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3148 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3149 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3152 * Compute the number of bytes from that offset to the end
3153 * of the object. Account for what's already used by the bio.
3155 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3156 if (ret > bmd->bi_size)
3157 ret -= bmd->bi_size;
3162 * Don't send back more than was asked for. And if the bio
3163 * was empty, let the whole thing through because: "Note
3164 * that a block device *must* allow a single page to be
3165 * added to an empty bio."
3167 rbd_assert(bvec->bv_len <= PAGE_SIZE);
3168 if (ret > (int) bvec->bv_len || !bmd->bi_size)
3169 ret = (int) bvec->bv_len;
3174 static void rbd_free_disk(struct rbd_device *rbd_dev)
3176 struct gendisk *disk = rbd_dev->disk;
3181 rbd_dev->disk = NULL;
3182 if (disk->flags & GENHD_FL_UP) {
3185 blk_cleanup_queue(disk->queue);
3190 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3191 const char *object_name,
3192 u64 offset, u64 length, void *buf)
3195 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3196 struct rbd_obj_request *obj_request;
3197 struct page **pages = NULL;
3202 page_count = (u32) calc_pages_for(offset, length);
3203 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3205 ret = PTR_ERR(pages);
3208 obj_request = rbd_obj_request_create(object_name, offset, length,
3213 obj_request->pages = pages;
3214 obj_request->page_count = page_count;
3216 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3217 if (!obj_request->osd_req)
3220 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3221 offset, length, 0, 0);
3222 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3224 obj_request->length,
3225 obj_request->offset & ~PAGE_MASK,
3227 rbd_osd_req_format_read(obj_request);
3229 ret = rbd_obj_request_submit(osdc, obj_request);
3232 ret = rbd_obj_request_wait(obj_request);
3236 ret = obj_request->result;
3240 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3241 size = (size_t) obj_request->xferred;
3242 ceph_copy_from_page_vector(pages, buf, 0, size);
3243 rbd_assert(size <= (size_t)INT_MAX);
3247 rbd_obj_request_put(obj_request);
3249 ceph_release_page_vector(pages, page_count);
3255 * Read the complete header for the given rbd device. On successful
3256 * return, the rbd_dev->header field will contain up-to-date
3257 * information about the image.
3259 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3261 struct rbd_image_header_ondisk *ondisk = NULL;
3268 * The complete header will include an array of its 64-bit
3269 * snapshot ids, followed by the names of those snapshots as
3270 * a contiguous block of NUL-terminated strings. Note that
3271 * the number of snapshots could change by the time we read
3272 * it in, in which case we re-read it.
3279 size = sizeof (*ondisk);
3280 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3282 ondisk = kmalloc(size, GFP_KERNEL);
3286 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3290 if ((size_t)ret < size) {
3292 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3296 if (!rbd_dev_ondisk_valid(ondisk)) {
3298 rbd_warn(rbd_dev, "invalid header");
3302 names_size = le64_to_cpu(ondisk->snap_names_len);
3303 want_count = snap_count;
3304 snap_count = le32_to_cpu(ondisk->snap_count);
3305 } while (snap_count != want_count);
3307 ret = rbd_header_from_disk(rbd_dev, ondisk);
3315 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3316 * has disappeared from the (just updated) snapshot context.
3318 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3322 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3325 snap_id = rbd_dev->spec->snap_id;
3326 if (snap_id == CEPH_NOSNAP)
3329 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3330 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3333 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3339 * Don't hold the lock while doing disk operations,
3340 * or lock ordering will conflict with the bdev mutex via:
3341 * rbd_add() -> blkdev_get() -> rbd_open()
3343 spin_lock_irq(&rbd_dev->lock);
3344 removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
3345 spin_unlock_irq(&rbd_dev->lock);
3347 * If the device is being removed, rbd_dev->disk has
3348 * been destroyed, so don't try to update its size
3351 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3352 dout("setting size to %llu sectors", (unsigned long long)size);
3353 set_capacity(rbd_dev->disk, size);
3354 revalidate_disk(rbd_dev->disk);
3358 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3363 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3364 down_write(&rbd_dev->header_rwsem);
3365 mapping_size = rbd_dev->mapping.size;
3366 if (rbd_dev->image_format == 1)
3367 ret = rbd_dev_v1_header_info(rbd_dev);
3369 ret = rbd_dev_v2_header_info(rbd_dev);
3371 /* If it's a mapped snapshot, validate its EXISTS flag */
3373 rbd_exists_validate(rbd_dev);
3374 up_write(&rbd_dev->header_rwsem);
3376 if (mapping_size != rbd_dev->mapping.size) {
3377 rbd_dev_update_size(rbd_dev);
3383 static int rbd_init_disk(struct rbd_device *rbd_dev)
3385 struct gendisk *disk;
3386 struct request_queue *q;
3389 /* create gendisk info */
3390 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3394 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3396 disk->major = rbd_dev->major;
3397 disk->first_minor = 0;
3398 disk->fops = &rbd_bd_ops;
3399 disk->private_data = rbd_dev;
3401 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3405 /* We use the default size, but let's be explicit about it. */
3406 blk_queue_physical_block_size(q, SECTOR_SIZE);
3408 /* set io sizes to object size */
3409 segment_size = rbd_obj_bytes(&rbd_dev->header);
3410 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3411 blk_queue_max_segment_size(q, segment_size);
3412 blk_queue_io_min(q, segment_size);
3413 blk_queue_io_opt(q, segment_size);
3415 blk_queue_merge_bvec(q, rbd_merge_bvec);
3418 q->queuedata = rbd_dev;
3420 rbd_dev->disk = disk;
3433 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3435 return container_of(dev, struct rbd_device, dev);
3438 static ssize_t rbd_size_show(struct device *dev,
3439 struct device_attribute *attr, char *buf)
3441 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3443 return sprintf(buf, "%llu\n",
3444 (unsigned long long)rbd_dev->mapping.size);
3448 * Note this shows the features for whatever's mapped, which is not
3449 * necessarily the base image.
3451 static ssize_t rbd_features_show(struct device *dev,
3452 struct device_attribute *attr, char *buf)
3454 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3456 return sprintf(buf, "0x%016llx\n",
3457 (unsigned long long)rbd_dev->mapping.features);
3460 static ssize_t rbd_major_show(struct device *dev,
3461 struct device_attribute *attr, char *buf)
3463 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3466 return sprintf(buf, "%d\n", rbd_dev->major);
3468 return sprintf(buf, "(none)\n");
3472 static ssize_t rbd_client_id_show(struct device *dev,
3473 struct device_attribute *attr, char *buf)
3475 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3477 return sprintf(buf, "client%lld\n",
3478 ceph_client_id(rbd_dev->rbd_client->client));
3481 static ssize_t rbd_pool_show(struct device *dev,
3482 struct device_attribute *attr, char *buf)
3484 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3486 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3489 static ssize_t rbd_pool_id_show(struct device *dev,
3490 struct device_attribute *attr, char *buf)
3492 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3494 return sprintf(buf, "%llu\n",
3495 (unsigned long long) rbd_dev->spec->pool_id);
3498 static ssize_t rbd_name_show(struct device *dev,
3499 struct device_attribute *attr, char *buf)
3501 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3503 if (rbd_dev->spec->image_name)
3504 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3506 return sprintf(buf, "(unknown)\n");
3509 static ssize_t rbd_image_id_show(struct device *dev,
3510 struct device_attribute *attr, char *buf)
3512 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3514 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3518 * Shows the name of the currently-mapped snapshot (or
3519 * RBD_SNAP_HEAD_NAME for the base image).
3521 static ssize_t rbd_snap_show(struct device *dev,
3522 struct device_attribute *attr,
3525 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3527 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3531 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3532 * for the parent image. If there is no parent, simply shows
3533 * "(no parent image)".
3535 static ssize_t rbd_parent_show(struct device *dev,
3536 struct device_attribute *attr,
3539 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3540 struct rbd_spec *spec = rbd_dev->parent_spec;
3545 return sprintf(buf, "(no parent image)\n");
3547 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3548 (unsigned long long) spec->pool_id, spec->pool_name);
3553 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3554 spec->image_name ? spec->image_name : "(unknown)");
3559 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3560 (unsigned long long) spec->snap_id, spec->snap_name);
3565 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3570 return (ssize_t) (bufp - buf);
3573 static ssize_t rbd_image_refresh(struct device *dev,
3574 struct device_attribute *attr,
3578 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3581 ret = rbd_dev_refresh(rbd_dev);
3583 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3585 return ret < 0 ? ret : size;
3588 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3589 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3590 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3591 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3592 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3593 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3594 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3595 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3596 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3597 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3598 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3600 static struct attribute *rbd_attrs[] = {
3601 &dev_attr_size.attr,
3602 &dev_attr_features.attr,
3603 &dev_attr_major.attr,
3604 &dev_attr_client_id.attr,
3605 &dev_attr_pool.attr,
3606 &dev_attr_pool_id.attr,
3607 &dev_attr_name.attr,
3608 &dev_attr_image_id.attr,
3609 &dev_attr_current_snap.attr,
3610 &dev_attr_parent.attr,
3611 &dev_attr_refresh.attr,
3615 static struct attribute_group rbd_attr_group = {
3619 static const struct attribute_group *rbd_attr_groups[] = {
3624 static void rbd_sysfs_dev_release(struct device *dev)
3628 static struct device_type rbd_device_type = {
3630 .groups = rbd_attr_groups,
3631 .release = rbd_sysfs_dev_release,
3634 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3636 kref_get(&spec->kref);
3641 static void rbd_spec_free(struct kref *kref);
3642 static void rbd_spec_put(struct rbd_spec *spec)
3645 kref_put(&spec->kref, rbd_spec_free);
3648 static struct rbd_spec *rbd_spec_alloc(void)
3650 struct rbd_spec *spec;
3652 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3655 kref_init(&spec->kref);
3660 static void rbd_spec_free(struct kref *kref)
3662 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3664 kfree(spec->pool_name);
3665 kfree(spec->image_id);
3666 kfree(spec->image_name);
3667 kfree(spec->snap_name);
3671 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3672 struct rbd_spec *spec)
3674 struct rbd_device *rbd_dev;
3676 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3680 spin_lock_init(&rbd_dev->lock);
3682 atomic_set(&rbd_dev->parent_ref, 0);
3683 INIT_LIST_HEAD(&rbd_dev->node);
3684 init_rwsem(&rbd_dev->header_rwsem);
3686 rbd_dev->spec = spec;
3687 rbd_dev->rbd_client = rbdc;
3689 /* Initialize the layout used for all rbd requests */
3691 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3692 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3693 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3694 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3699 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3701 rbd_put_client(rbd_dev->rbd_client);
3702 rbd_spec_put(rbd_dev->spec);
3707 * Get the size and object order for an image snapshot, or if
3708 * snap_id is CEPH_NOSNAP, gets this information for the base
3711 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3712 u8 *order, u64 *snap_size)
3714 __le64 snapid = cpu_to_le64(snap_id);
3719 } __attribute__ ((packed)) size_buf = { 0 };
3721 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3723 &snapid, sizeof (snapid),
3724 &size_buf, sizeof (size_buf));
3725 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3728 if (ret < sizeof (size_buf))
3732 *order = size_buf.order;
3733 dout(" order %u", (unsigned int)*order);
3735 *snap_size = le64_to_cpu(size_buf.size);
3737 dout(" snap_id 0x%016llx snap_size = %llu\n",
3738 (unsigned long long)snap_id,
3739 (unsigned long long)*snap_size);
3744 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3746 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3747 &rbd_dev->header.obj_order,
3748 &rbd_dev->header.image_size);
3751 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3757 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3761 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3762 "rbd", "get_object_prefix", NULL, 0,
3763 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3764 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3769 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3770 p + ret, NULL, GFP_NOIO);
3773 if (IS_ERR(rbd_dev->header.object_prefix)) {
3774 ret = PTR_ERR(rbd_dev->header.object_prefix);
3775 rbd_dev->header.object_prefix = NULL;
3777 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3785 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3788 __le64 snapid = cpu_to_le64(snap_id);
3792 } __attribute__ ((packed)) features_buf = { 0 };
3796 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3797 "rbd", "get_features",
3798 &snapid, sizeof (snapid),
3799 &features_buf, sizeof (features_buf));
3800 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3803 if (ret < sizeof (features_buf))
3806 incompat = le64_to_cpu(features_buf.incompat);
3807 if (incompat & ~RBD_FEATURES_SUPPORTED)
3810 *snap_features = le64_to_cpu(features_buf.features);
3812 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3813 (unsigned long long)snap_id,
3814 (unsigned long long)*snap_features,
3815 (unsigned long long)le64_to_cpu(features_buf.incompat));
3820 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3822 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3823 &rbd_dev->header.features);
3826 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3828 struct rbd_spec *parent_spec;
3830 void *reply_buf = NULL;
3840 parent_spec = rbd_spec_alloc();
3844 size = sizeof (__le64) + /* pool_id */
3845 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3846 sizeof (__le64) + /* snap_id */
3847 sizeof (__le64); /* overlap */
3848 reply_buf = kmalloc(size, GFP_KERNEL);
3854 snapid = cpu_to_le64(CEPH_NOSNAP);
3855 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3856 "rbd", "get_parent",
3857 &snapid, sizeof (snapid),
3859 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3864 end = reply_buf + ret;
3866 ceph_decode_64_safe(&p, end, pool_id, out_err);
3867 if (pool_id == CEPH_NOPOOL) {
3869 * Either the parent never existed, or we have
3870 * record of it but the image got flattened so it no
3871 * longer has a parent. When the parent of a
3872 * layered image disappears we immediately set the
3873 * overlap to 0. The effect of this is that all new
3874 * requests will be treated as if the image had no
3877 if (rbd_dev->parent_overlap) {
3878 rbd_dev->parent_overlap = 0;
3880 rbd_dev_parent_put(rbd_dev);
3881 pr_info("%s: clone image has been flattened\n",
3882 rbd_dev->disk->disk_name);
3885 goto out; /* No parent? No problem. */
3888 /* The ceph file layout needs to fit pool id in 32 bits */
3891 if (pool_id > (u64)U32_MAX) {
3892 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3893 (unsigned long long)pool_id, U32_MAX);
3897 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3898 if (IS_ERR(image_id)) {
3899 ret = PTR_ERR(image_id);
3902 ceph_decode_64_safe(&p, end, snap_id, out_err);
3903 ceph_decode_64_safe(&p, end, overlap, out_err);
3906 * The parent won't change (except when the clone is
3907 * flattened, already handled that). So we only need to
3908 * record the parent spec we have not already done so.
3910 if (!rbd_dev->parent_spec) {
3911 parent_spec->pool_id = pool_id;
3912 parent_spec->image_id = image_id;
3913 parent_spec->snap_id = snap_id;
3914 rbd_dev->parent_spec = parent_spec;
3915 parent_spec = NULL; /* rbd_dev now owns this */
3919 * We always update the parent overlap. If it's zero we
3920 * treat it specially.
3922 rbd_dev->parent_overlap = overlap;
3926 /* A null parent_spec indicates it's the initial probe */
3930 * The overlap has become zero, so the clone
3931 * must have been resized down to 0 at some
3932 * point. Treat this the same as a flatten.
3934 rbd_dev_parent_put(rbd_dev);
3935 pr_info("%s: clone image now standalone\n",
3936 rbd_dev->disk->disk_name);
3939 * For the initial probe, if we find the
3940 * overlap is zero we just pretend there was
3943 rbd_warn(rbd_dev, "ignoring parent of "
3944 "clone with overlap 0\n");
3951 rbd_spec_put(parent_spec);
3956 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3960 __le64 stripe_count;
3961 } __attribute__ ((packed)) striping_info_buf = { 0 };
3962 size_t size = sizeof (striping_info_buf);
3969 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3970 "rbd", "get_stripe_unit_count", NULL, 0,
3971 (char *)&striping_info_buf, size);
3972 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3979 * We don't actually support the "fancy striping" feature
3980 * (STRIPINGV2) yet, but if the striping sizes are the
3981 * defaults the behavior is the same as before. So find
3982 * out, and only fail if the image has non-default values.
3985 obj_size = (u64)1 << rbd_dev->header.obj_order;
3986 p = &striping_info_buf;
3987 stripe_unit = ceph_decode_64(&p);
3988 if (stripe_unit != obj_size) {
3989 rbd_warn(rbd_dev, "unsupported stripe unit "
3990 "(got %llu want %llu)",
3991 stripe_unit, obj_size);
3994 stripe_count = ceph_decode_64(&p);
3995 if (stripe_count != 1) {
3996 rbd_warn(rbd_dev, "unsupported stripe count "
3997 "(got %llu want 1)", stripe_count);
4000 rbd_dev->header.stripe_unit = stripe_unit;
4001 rbd_dev->header.stripe_count = stripe_count;
4006 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4008 size_t image_id_size;
4013 void *reply_buf = NULL;
4015 char *image_name = NULL;
4018 rbd_assert(!rbd_dev->spec->image_name);
4020 len = strlen(rbd_dev->spec->image_id);
4021 image_id_size = sizeof (__le32) + len;
4022 image_id = kmalloc(image_id_size, GFP_KERNEL);
4027 end = image_id + image_id_size;
4028 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
4030 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4031 reply_buf = kmalloc(size, GFP_KERNEL);
4035 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
4036 "rbd", "dir_get_name",
4037 image_id, image_id_size,
4042 end = reply_buf + ret;
4044 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4045 if (IS_ERR(image_name))
4048 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4056 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4058 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4059 const char *snap_name;
4062 /* Skip over names until we find the one we are looking for */
4064 snap_name = rbd_dev->header.snap_names;
4065 while (which < snapc->num_snaps) {
4066 if (!strcmp(name, snap_name))
4067 return snapc->snaps[which];
4068 snap_name += strlen(snap_name) + 1;
4074 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4076 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4081 for (which = 0; !found && which < snapc->num_snaps; which++) {
4082 const char *snap_name;
4084 snap_id = snapc->snaps[which];
4085 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4086 if (IS_ERR(snap_name)) {
4087 /* ignore no-longer existing snapshots */
4088 if (PTR_ERR(snap_name) == -ENOENT)
4093 found = !strcmp(name, snap_name);
4096 return found ? snap_id : CEPH_NOSNAP;
4100 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4101 * no snapshot by that name is found, or if an error occurs.
4103 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4105 if (rbd_dev->image_format == 1)
4106 return rbd_v1_snap_id_by_name(rbd_dev, name);
4108 return rbd_v2_snap_id_by_name(rbd_dev, name);
4112 * When an rbd image has a parent image, it is identified by the
4113 * pool, image, and snapshot ids (not names). This function fills
4114 * in the names for those ids. (It's OK if we can't figure out the
4115 * name for an image id, but the pool and snapshot ids should always
4116 * exist and have names.) All names in an rbd spec are dynamically
4119 * When an image being mapped (not a parent) is probed, we have the
4120 * pool name and pool id, image name and image id, and the snapshot
4121 * name. The only thing we're missing is the snapshot id.
4123 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
4125 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4126 struct rbd_spec *spec = rbd_dev->spec;
4127 const char *pool_name;
4128 const char *image_name;
4129 const char *snap_name;
4133 * An image being mapped will have the pool name (etc.), but
4134 * we need to look up the snapshot id.
4136 if (spec->pool_name) {
4137 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4140 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4141 if (snap_id == CEPH_NOSNAP)
4143 spec->snap_id = snap_id;
4145 spec->snap_id = CEPH_NOSNAP;
4151 /* Get the pool name; we have to make our own copy of this */
4153 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4155 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4158 pool_name = kstrdup(pool_name, GFP_KERNEL);
4162 /* Fetch the image name; tolerate failure here */
4164 image_name = rbd_dev_image_name(rbd_dev);
4166 rbd_warn(rbd_dev, "unable to get image name");
4168 /* Look up the snapshot name, and make a copy */
4170 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4171 if (IS_ERR(snap_name)) {
4172 ret = PTR_ERR(snap_name);
4176 spec->pool_name = pool_name;
4177 spec->image_name = image_name;
4178 spec->snap_name = snap_name;
4188 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4197 struct ceph_snap_context *snapc;
4201 * We'll need room for the seq value (maximum snapshot id),
4202 * snapshot count, and array of that many snapshot ids.
4203 * For now we have a fixed upper limit on the number we're
4204 * prepared to receive.
4206 size = sizeof (__le64) + sizeof (__le32) +
4207 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4208 reply_buf = kzalloc(size, GFP_KERNEL);
4212 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4213 "rbd", "get_snapcontext", NULL, 0,
4215 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4220 end = reply_buf + ret;
4222 ceph_decode_64_safe(&p, end, seq, out);
4223 ceph_decode_32_safe(&p, end, snap_count, out);
4226 * Make sure the reported number of snapshot ids wouldn't go
4227 * beyond the end of our buffer. But before checking that,
4228 * make sure the computed size of the snapshot context we
4229 * allocate is representable in a size_t.
4231 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4236 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4240 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4246 for (i = 0; i < snap_count; i++)
4247 snapc->snaps[i] = ceph_decode_64(&p);
4249 ceph_put_snap_context(rbd_dev->header.snapc);
4250 rbd_dev->header.snapc = snapc;
4252 dout(" snap context seq = %llu, snap_count = %u\n",
4253 (unsigned long long)seq, (unsigned int)snap_count);
4260 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4271 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4272 reply_buf = kmalloc(size, GFP_KERNEL);
4274 return ERR_PTR(-ENOMEM);
4276 snapid = cpu_to_le64(snap_id);
4277 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4278 "rbd", "get_snapshot_name",
4279 &snapid, sizeof (snapid),
4281 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4283 snap_name = ERR_PTR(ret);
4288 end = reply_buf + ret;
4289 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4290 if (IS_ERR(snap_name))
4293 dout(" snap_id 0x%016llx snap_name = %s\n",
4294 (unsigned long long)snap_id, snap_name);
4301 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4303 bool first_time = rbd_dev->header.object_prefix == NULL;
4306 ret = rbd_dev_v2_image_size(rbd_dev);
4311 ret = rbd_dev_v2_header_onetime(rbd_dev);
4317 * If the image supports layering, get the parent info. We
4318 * need to probe the first time regardless. Thereafter we
4319 * only need to if there's a parent, to see if it has
4320 * disappeared due to the mapped image getting flattened.
4322 if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4323 (first_time || rbd_dev->parent_spec)) {
4326 ret = rbd_dev_v2_parent_info(rbd_dev);
4331 * Print a warning if this is the initial probe and
4332 * the image has a parent. Don't print it if the
4333 * image now being probed is itself a parent. We
4334 * can tell at this point because we won't know its
4335 * pool name yet (just its pool id).
4337 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4338 if (first_time && warn)
4339 rbd_warn(rbd_dev, "WARNING: kernel layering "
4340 "is EXPERIMENTAL!");
4343 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4344 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4345 rbd_dev->mapping.size = rbd_dev->header.image_size;
4347 ret = rbd_dev_v2_snap_context(rbd_dev);
4348 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4353 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4358 dev = &rbd_dev->dev;
4359 dev->bus = &rbd_bus_type;
4360 dev->type = &rbd_device_type;
4361 dev->parent = &rbd_root_dev;
4362 dev->release = rbd_dev_device_release;
4363 dev_set_name(dev, "%d", rbd_dev->dev_id);
4364 ret = device_register(dev);
4369 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4371 device_unregister(&rbd_dev->dev);
4374 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4377 * Get a unique rbd identifier for the given new rbd_dev, and add
4378 * the rbd_dev to the global list. The minimum rbd id is 1.
4380 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4382 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4384 spin_lock(&rbd_dev_list_lock);
4385 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4386 spin_unlock(&rbd_dev_list_lock);
4387 dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
4391 * Remove an rbd_dev from the global list, and record that its
4392 * identifier is no longer in use.
4394 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4396 struct list_head *tmp;
4397 int rbd_id = rbd_dev->dev_id;
4400 rbd_assert(rbd_id > 0);
4402 dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
4403 spin_lock(&rbd_dev_list_lock);
4404 list_del_init(&rbd_dev->node);
4407 * If the id being "put" is not the current maximum, there
4408 * is nothing special we need to do.
4410 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4411 spin_unlock(&rbd_dev_list_lock);
4416 * We need to update the current maximum id. Search the
4417 * list to find out what it is. We're more likely to find
4418 * the maximum at the end, so search the list backward.
4421 list_for_each_prev(tmp, &rbd_dev_list) {
4422 struct rbd_device *rbd_dev;
4424 rbd_dev = list_entry(tmp, struct rbd_device, node);
4425 if (rbd_dev->dev_id > max_id)
4426 max_id = rbd_dev->dev_id;
4428 spin_unlock(&rbd_dev_list_lock);
4431 * The max id could have been updated by rbd_dev_id_get(), in
4432 * which case it now accurately reflects the new maximum.
4433 * Be careful not to overwrite the maximum value in that
4436 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4437 dout(" max dev id has been reset\n");
4441 * Skips over white space at *buf, and updates *buf to point to the
4442 * first found non-space character (if any). Returns the length of
4443 * the token (string of non-white space characters) found. Note
4444 * that *buf must be terminated with '\0'.
4446 static inline size_t next_token(const char **buf)
4449 * These are the characters that produce nonzero for
4450 * isspace() in the "C" and "POSIX" locales.
4452 const char *spaces = " \f\n\r\t\v";
4454 *buf += strspn(*buf, spaces); /* Find start of token */
4456 return strcspn(*buf, spaces); /* Return token length */
4460 * Finds the next token in *buf, and if the provided token buffer is
4461 * big enough, copies the found token into it. The result, if
4462 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4463 * must be terminated with '\0' on entry.
4465 * Returns the length of the token found (not including the '\0').
4466 * Return value will be 0 if no token is found, and it will be >=
4467 * token_size if the token would not fit.
4469 * The *buf pointer will be updated to point beyond the end of the
4470 * found token. Note that this occurs even if the token buffer is
4471 * too small to hold it.
4473 static inline size_t copy_token(const char **buf,
4479 len = next_token(buf);
4480 if (len < token_size) {
4481 memcpy(token, *buf, len);
4482 *(token + len) = '\0';
4490 * Finds the next token in *buf, dynamically allocates a buffer big
4491 * enough to hold a copy of it, and copies the token into the new
4492 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4493 * that a duplicate buffer is created even for a zero-length token.
4495 * Returns a pointer to the newly-allocated duplicate, or a null
4496 * pointer if memory for the duplicate was not available. If
4497 * the lenp argument is a non-null pointer, the length of the token
4498 * (not including the '\0') is returned in *lenp.
4500 * If successful, the *buf pointer will be updated to point beyond
4501 * the end of the found token.
4503 * Note: uses GFP_KERNEL for allocation.
4505 static inline char *dup_token(const char **buf, size_t *lenp)
4510 len = next_token(buf);
4511 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4514 *(dup + len) = '\0';
4524 * Parse the options provided for an "rbd add" (i.e., rbd image
4525 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4526 * and the data written is passed here via a NUL-terminated buffer.
4527 * Returns 0 if successful or an error code otherwise.
4529 * The information extracted from these options is recorded in
4530 * the other parameters which return dynamically-allocated
4533 * The address of a pointer that will refer to a ceph options
4534 * structure. Caller must release the returned pointer using
4535 * ceph_destroy_options() when it is no longer needed.
4537 * Address of an rbd options pointer. Fully initialized by
4538 * this function; caller must release with kfree().
4540 * Address of an rbd image specification pointer. Fully
4541 * initialized by this function based on parsed options.
4542 * Caller must release with rbd_spec_put().
4544 * The options passed take this form:
4545 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4548 * A comma-separated list of one or more monitor addresses.
4549 * A monitor address is an ip address, optionally followed
4550 * by a port number (separated by a colon).
4551 * I.e.: ip1[:port1][,ip2[:port2]...]
4553 * A comma-separated list of ceph and/or rbd options.
4555 * The name of the rados pool containing the rbd image.
4557 * The name of the image in that pool to map.
4559 * An optional snapshot id. If provided, the mapping will
4560 * present data from the image at the time that snapshot was
4561 * created. The image head is used if no snapshot id is
4562 * provided. Snapshot mappings are always read-only.
4564 static int rbd_add_parse_args(const char *buf,
4565 struct ceph_options **ceph_opts,
4566 struct rbd_options **opts,
4567 struct rbd_spec **rbd_spec)
4571 const char *mon_addrs;
4573 size_t mon_addrs_size;
4574 struct rbd_spec *spec = NULL;
4575 struct rbd_options *rbd_opts = NULL;
4576 struct ceph_options *copts;
4579 /* The first four tokens are required */
4581 len = next_token(&buf);
4583 rbd_warn(NULL, "no monitor address(es) provided");
4587 mon_addrs_size = len + 1;
4591 options = dup_token(&buf, NULL);
4595 rbd_warn(NULL, "no options provided");
4599 spec = rbd_spec_alloc();
4603 spec->pool_name = dup_token(&buf, NULL);
4604 if (!spec->pool_name)
4606 if (!*spec->pool_name) {
4607 rbd_warn(NULL, "no pool name provided");
4611 spec->image_name = dup_token(&buf, NULL);
4612 if (!spec->image_name)
4614 if (!*spec->image_name) {
4615 rbd_warn(NULL, "no image name provided");
4620 * Snapshot name is optional; default is to use "-"
4621 * (indicating the head/no snapshot).
4623 len = next_token(&buf);
4625 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4626 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4627 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4628 ret = -ENAMETOOLONG;
4631 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4634 *(snap_name + len) = '\0';
4635 spec->snap_name = snap_name;
4637 /* Initialize all rbd options to the defaults */
4639 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4643 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4645 copts = ceph_parse_options(options, mon_addrs,
4646 mon_addrs + mon_addrs_size - 1,
4647 parse_rbd_opts_token, rbd_opts);
4648 if (IS_ERR(copts)) {
4649 ret = PTR_ERR(copts);
4670 * An rbd format 2 image has a unique identifier, distinct from the
4671 * name given to it by the user. Internally, that identifier is
4672 * what's used to specify the names of objects related to the image.
4674 * A special "rbd id" object is used to map an rbd image name to its
4675 * id. If that object doesn't exist, then there is no v2 rbd image
4676 * with the supplied name.
4678 * This function will record the given rbd_dev's image_id field if
4679 * it can be determined, and in that case will return 0. If any
4680 * errors occur a negative errno will be returned and the rbd_dev's
4681 * image_id field will be unchanged (and should be NULL).
4683 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4692 * When probing a parent image, the image id is already
4693 * known (and the image name likely is not). There's no
4694 * need to fetch the image id again in this case. We
4695 * do still need to set the image format though.
4697 if (rbd_dev->spec->image_id) {
4698 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4704 * First, see if the format 2 image id file exists, and if
4705 * so, get the image's persistent id from it.
4707 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4708 object_name = kmalloc(size, GFP_NOIO);
4711 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4712 dout("rbd id object name is %s\n", object_name);
4714 /* Response will be an encoded string, which includes a length */
4716 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4717 response = kzalloc(size, GFP_NOIO);
4723 /* If it doesn't exist we'll assume it's a format 1 image */
4725 ret = rbd_obj_method_sync(rbd_dev, object_name,
4726 "rbd", "get_id", NULL, 0,
4727 response, RBD_IMAGE_ID_LEN_MAX);
4728 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4729 if (ret == -ENOENT) {
4730 image_id = kstrdup("", GFP_KERNEL);
4731 ret = image_id ? 0 : -ENOMEM;
4733 rbd_dev->image_format = 1;
4734 } else if (ret > sizeof (__le32)) {
4737 image_id = ceph_extract_encoded_string(&p, p + ret,
4739 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4741 rbd_dev->image_format = 2;
4747 rbd_dev->spec->image_id = image_id;
4748 dout("image_id is %s\n", image_id);
4758 * Undo whatever state changes are made by v1 or v2 header info
4761 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4763 struct rbd_image_header *header;
4765 /* Drop parent reference unless it's already been done (or none) */
4767 if (rbd_dev->parent_overlap)
4768 rbd_dev_parent_put(rbd_dev);
4770 /* Free dynamic fields from the header, then zero it out */
4772 header = &rbd_dev->header;
4773 ceph_put_snap_context(header->snapc);
4774 kfree(header->snap_sizes);
4775 kfree(header->snap_names);
4776 kfree(header->object_prefix);
4777 memset(header, 0, sizeof (*header));
4780 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4784 ret = rbd_dev_v2_object_prefix(rbd_dev);
4789 * Get the and check features for the image. Currently the
4790 * features are assumed to never change.
4792 ret = rbd_dev_v2_features(rbd_dev);
4796 /* If the image supports fancy striping, get its parameters */
4798 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4799 ret = rbd_dev_v2_striping_info(rbd_dev);
4803 /* No support for crypto and compression type format 2 images */
4807 rbd_dev->header.features = 0;
4808 kfree(rbd_dev->header.object_prefix);
4809 rbd_dev->header.object_prefix = NULL;
4814 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4816 struct rbd_device *parent = NULL;
4817 struct rbd_spec *parent_spec;
4818 struct rbd_client *rbdc;
4821 if (!rbd_dev->parent_spec)
4824 * We need to pass a reference to the client and the parent
4825 * spec when creating the parent rbd_dev. Images related by
4826 * parent/child relationships always share both.
4828 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4829 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4832 parent = rbd_dev_create(rbdc, parent_spec);
4836 ret = rbd_dev_image_probe(parent, false);
4839 rbd_dev->parent = parent;
4840 atomic_set(&rbd_dev->parent_ref, 1);
4845 rbd_dev_unparent(rbd_dev);
4846 kfree(rbd_dev->header_name);
4847 rbd_dev_destroy(parent);
4849 rbd_put_client(rbdc);
4850 rbd_spec_put(parent_spec);
4856 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4860 /* generate unique id: find highest unique id, add one */
4861 rbd_dev_id_get(rbd_dev);
4863 /* Fill in the device name, now that we have its id. */
4864 BUILD_BUG_ON(DEV_NAME_LEN
4865 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4866 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4868 /* Get our block major device number. */
4870 ret = register_blkdev(0, rbd_dev->name);
4873 rbd_dev->major = ret;
4875 /* Set up the blkdev mapping. */
4877 ret = rbd_init_disk(rbd_dev);
4879 goto err_out_blkdev;
4881 ret = rbd_dev_mapping_set(rbd_dev);
4884 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4886 ret = rbd_bus_add_dev(rbd_dev);
4888 goto err_out_mapping;
4890 /* Everything's ready. Announce the disk to the world. */
4892 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4893 add_disk(rbd_dev->disk);
4895 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4896 (unsigned long long) rbd_dev->mapping.size);
4901 rbd_dev_mapping_clear(rbd_dev);
4903 rbd_free_disk(rbd_dev);
4905 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4907 rbd_dev_id_put(rbd_dev);
4908 rbd_dev_mapping_clear(rbd_dev);
4913 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4915 struct rbd_spec *spec = rbd_dev->spec;
4918 /* Record the header object name for this rbd image. */
4920 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4922 if (rbd_dev->image_format == 1)
4923 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4925 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4927 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4928 if (!rbd_dev->header_name)
4931 if (rbd_dev->image_format == 1)
4932 sprintf(rbd_dev->header_name, "%s%s",
4933 spec->image_name, RBD_SUFFIX);
4935 sprintf(rbd_dev->header_name, "%s%s",
4936 RBD_HEADER_PREFIX, spec->image_id);
4940 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4942 rbd_dev_unprobe(rbd_dev);
4943 kfree(rbd_dev->header_name);
4944 rbd_dev->header_name = NULL;
4945 rbd_dev->image_format = 0;
4946 kfree(rbd_dev->spec->image_id);
4947 rbd_dev->spec->image_id = NULL;
4949 rbd_dev_destroy(rbd_dev);
4953 * Probe for the existence of the header object for the given rbd
4954 * device. If this image is the one being mapped (i.e., not a
4955 * parent), initiate a watch on its header object before using that
4956 * object to get detailed information about the rbd image.
4958 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4964 * Get the id from the image id object. Unless there's an
4965 * error, rbd_dev->spec->image_id will be filled in with
4966 * a dynamically-allocated string, and rbd_dev->image_format
4967 * will be set to either 1 or 2.
4969 ret = rbd_dev_image_id(rbd_dev);
4972 rbd_assert(rbd_dev->spec->image_id);
4973 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4975 ret = rbd_dev_header_name(rbd_dev);
4977 goto err_out_format;
4980 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4982 goto out_header_name;
4985 if (rbd_dev->image_format == 1)
4986 ret = rbd_dev_v1_header_info(rbd_dev);
4988 ret = rbd_dev_v2_header_info(rbd_dev);
4992 ret = rbd_dev_spec_update(rbd_dev);
4996 ret = rbd_dev_probe_parent(rbd_dev);
5000 dout("discovered format %u image, header name is %s\n",
5001 rbd_dev->image_format, rbd_dev->header_name);
5005 rbd_dev_unprobe(rbd_dev);
5008 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
5010 rbd_warn(rbd_dev, "unable to tear down "
5011 "watch request (%d)\n", tmp);
5014 kfree(rbd_dev->header_name);
5015 rbd_dev->header_name = NULL;
5017 rbd_dev->image_format = 0;
5018 kfree(rbd_dev->spec->image_id);
5019 rbd_dev->spec->image_id = NULL;
5021 dout("probe failed, returning %d\n", ret);
5026 static ssize_t rbd_add(struct bus_type *bus,
5030 struct rbd_device *rbd_dev = NULL;
5031 struct ceph_options *ceph_opts = NULL;
5032 struct rbd_options *rbd_opts = NULL;
5033 struct rbd_spec *spec = NULL;
5034 struct rbd_client *rbdc;
5035 struct ceph_osd_client *osdc;
5039 if (!try_module_get(THIS_MODULE))
5042 /* parse add command */
5043 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5045 goto err_out_module;
5046 read_only = rbd_opts->read_only;
5048 rbd_opts = NULL; /* done with this */
5050 rbdc = rbd_get_client(ceph_opts);
5057 osdc = &rbdc->client->osdc;
5058 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
5060 goto err_out_client;
5061 spec->pool_id = (u64)rc;
5063 /* The ceph file layout needs to fit pool id in 32 bits */
5065 if (spec->pool_id > (u64)U32_MAX) {
5066 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5067 (unsigned long long)spec->pool_id, U32_MAX);
5069 goto err_out_client;
5072 rbd_dev = rbd_dev_create(rbdc, spec);
5074 goto err_out_client;
5075 rbdc = NULL; /* rbd_dev now owns this */
5076 spec = NULL; /* rbd_dev now owns this */
5078 rc = rbd_dev_image_probe(rbd_dev, true);
5080 goto err_out_rbd_dev;
5082 /* If we are mapping a snapshot it must be marked read-only */
5084 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5086 rbd_dev->mapping.read_only = read_only;
5088 rc = rbd_dev_device_setup(rbd_dev);
5090 rbd_dev_image_release(rbd_dev);
5091 goto err_out_module;
5097 rbd_dev_destroy(rbd_dev);
5099 rbd_put_client(rbdc);
5103 module_put(THIS_MODULE);
5105 dout("Error adding device %s\n", buf);
5110 static void rbd_dev_device_release(struct device *dev)
5112 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5114 rbd_free_disk(rbd_dev);
5115 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5116 rbd_dev_mapping_clear(rbd_dev);
5117 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5119 rbd_dev_id_put(rbd_dev);
5120 rbd_dev_mapping_clear(rbd_dev);
5123 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5125 while (rbd_dev->parent) {
5126 struct rbd_device *first = rbd_dev;
5127 struct rbd_device *second = first->parent;
5128 struct rbd_device *third;
5131 * Follow to the parent with no grandparent and
5134 while (second && (third = second->parent)) {
5139 rbd_dev_image_release(second);
5140 first->parent = NULL;
5141 first->parent_overlap = 0;
5143 rbd_assert(first->parent_spec);
5144 rbd_spec_put(first->parent_spec);
5145 first->parent_spec = NULL;
5149 static ssize_t rbd_remove(struct bus_type *bus,
5153 struct rbd_device *rbd_dev = NULL;
5154 struct list_head *tmp;
5157 bool already = false;
5160 ret = kstrtoul(buf, 10, &ul);
5164 /* convert to int; abort if we lost anything in the conversion */
5170 spin_lock(&rbd_dev_list_lock);
5171 list_for_each(tmp, &rbd_dev_list) {
5172 rbd_dev = list_entry(tmp, struct rbd_device, node);
5173 if (rbd_dev->dev_id == dev_id) {
5179 spin_lock_irq(&rbd_dev->lock);
5180 if (rbd_dev->open_count)
5183 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5185 spin_unlock_irq(&rbd_dev->lock);
5187 spin_unlock(&rbd_dev_list_lock);
5188 if (ret < 0 || already)
5191 ret = rbd_dev_header_watch_sync(rbd_dev, false);
5193 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
5196 * flush remaining watch callbacks - these must be complete
5197 * before the osd_client is shutdown
5199 dout("%s: flushing notifies", __func__);
5200 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
5202 * Don't free anything from rbd_dev->disk until after all
5203 * notifies are completely processed. Otherwise
5204 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5205 * in a potential use after free of rbd_dev->disk or rbd_dev.
5207 rbd_bus_del_dev(rbd_dev);
5208 rbd_dev_image_release(rbd_dev);
5209 module_put(THIS_MODULE);
5215 * create control files in sysfs
5218 static int rbd_sysfs_init(void)
5222 ret = device_register(&rbd_root_dev);
5226 ret = bus_register(&rbd_bus_type);
5228 device_unregister(&rbd_root_dev);
5233 static void rbd_sysfs_cleanup(void)
5235 bus_unregister(&rbd_bus_type);
5236 device_unregister(&rbd_root_dev);
5239 static int rbd_slab_init(void)
5241 rbd_assert(!rbd_img_request_cache);
5242 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5243 sizeof (struct rbd_img_request),
5244 __alignof__(struct rbd_img_request),
5246 if (!rbd_img_request_cache)
5249 rbd_assert(!rbd_obj_request_cache);
5250 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5251 sizeof (struct rbd_obj_request),
5252 __alignof__(struct rbd_obj_request),
5254 if (!rbd_obj_request_cache)
5257 rbd_assert(!rbd_segment_name_cache);
5258 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5259 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5260 if (rbd_segment_name_cache)
5263 if (rbd_obj_request_cache) {
5264 kmem_cache_destroy(rbd_obj_request_cache);
5265 rbd_obj_request_cache = NULL;
5268 kmem_cache_destroy(rbd_img_request_cache);
5269 rbd_img_request_cache = NULL;
5274 static void rbd_slab_exit(void)
5276 rbd_assert(rbd_segment_name_cache);
5277 kmem_cache_destroy(rbd_segment_name_cache);
5278 rbd_segment_name_cache = NULL;
5280 rbd_assert(rbd_obj_request_cache);
5281 kmem_cache_destroy(rbd_obj_request_cache);
5282 rbd_obj_request_cache = NULL;
5284 rbd_assert(rbd_img_request_cache);
5285 kmem_cache_destroy(rbd_img_request_cache);
5286 rbd_img_request_cache = NULL;
5289 static int __init rbd_init(void)
5293 if (!libceph_compatible(NULL)) {
5294 rbd_warn(NULL, "libceph incompatibility (quitting)");
5298 rc = rbd_slab_init();
5301 rc = rbd_sysfs_init();
5305 pr_info("loaded\n");
5310 static void __exit rbd_exit(void)
5312 rbd_sysfs_cleanup();
5316 module_init(rbd_init);
5317 module_exit(rbd_exit);
5319 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
5320 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5321 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5322 /* following authorship retained from original osdblk.c */
5323 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5325 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
5326 MODULE_LICENSE("GPL");