3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
45 #include "rbd_types.h"
47 #define RBD_DEBUG /* Activate rbd_assert() calls */
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
55 #define SECTOR_SHIFT 9
56 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
63 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN \
65 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69 #define RBD_SNAP_HEAD_NAME "-"
71 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX 64
77 #define RBD_OBJ_PREFIX_LEN_MAX 64
81 #define RBD_FEATURE_LAYERING (1<<0)
82 #define RBD_FEATURE_STRIPINGV2 (1<<1)
83 #define RBD_FEATURES_ALL \
84 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
86 /* Features supported by this (client software) implementation. */
88 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
91 * An RBD device name will be "rbd#", where the "rbd" comes from
92 * RBD_DRV_NAME above, and # is a unique integer identifier.
93 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94 * enough to hold all possible device names.
96 #define DEV_NAME_LEN 32
97 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
100 * block device image metadata (in-memory version)
102 struct rbd_image_header {
103 /* These four fields never change for a given rbd image */
110 /* The remaining fields need to be updated occasionally */
112 struct ceph_snap_context *snapc;
121 * An rbd image specification.
123 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
124 * identify an image. Each rbd_dev structure includes a pointer to
125 * an rbd_spec structure that encapsulates this identity.
127 * Each of the id's in an rbd_spec has an associated name. For a
128 * user-mapped image, the names are supplied and the id's associated
129 * with them are looked up. For a layered image, a parent image is
130 * defined by the tuple, and the names are looked up.
132 * An rbd_dev structure contains a parent_spec pointer which is
133 * non-null if the image it represents is a child in a layered
134 * image. This pointer will refer to the rbd_spec structure used
135 * by the parent rbd_dev for its own identity (i.e., the structure
136 * is shared between the parent and child).
138 * Since these structures are populated once, during the discovery
139 * phase of image construction, they are effectively immutable so
140 * we make no effort to synchronize access to them.
142 * Note that code herein does not assume the image name is known (it
143 * could be a null pointer).
147 const char *pool_name;
149 const char *image_id;
150 const char *image_name;
153 const char *snap_name;
159 * an instance of the client. multiple devices may share an rbd client.
162 struct ceph_client *client;
164 struct list_head node;
167 struct rbd_img_request;
168 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
170 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
172 struct rbd_obj_request;
173 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
175 enum obj_request_type {
176 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
180 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
181 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
182 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
183 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
186 struct rbd_obj_request {
187 const char *object_name;
188 u64 offset; /* object start byte */
189 u64 length; /* bytes from offset */
193 * An object request associated with an image will have its
194 * img_data flag set; a standalone object request will not.
196 * A standalone object request will have which == BAD_WHICH
197 * and a null obj_request pointer.
199 * An object request initiated in support of a layered image
200 * object (to check for its existence before a write) will
201 * have which == BAD_WHICH and a non-null obj_request pointer.
203 * Finally, an object request for rbd image data will have
204 * which != BAD_WHICH, and will have a non-null img_request
205 * pointer. The value of which will be in the range
206 * 0..(img_request->obj_request_count-1).
209 struct rbd_obj_request *obj_request; /* STAT op */
211 struct rbd_img_request *img_request;
213 /* links for img_request->obj_requests list */
214 struct list_head links;
217 u32 which; /* posn image request list */
219 enum obj_request_type type;
221 struct bio *bio_list;
227 struct page **copyup_pages;
229 struct ceph_osd_request *osd_req;
231 u64 xferred; /* bytes transferred */
234 rbd_obj_callback_t callback;
235 struct completion completion;
241 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
242 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
243 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
246 struct rbd_img_request {
247 struct rbd_device *rbd_dev;
248 u64 offset; /* starting image byte offset */
249 u64 length; /* byte count from offset */
252 u64 snap_id; /* for reads */
253 struct ceph_snap_context *snapc; /* for writes */
256 struct request *rq; /* block request */
257 struct rbd_obj_request *obj_request; /* obj req initiator */
259 struct page **copyup_pages;
260 spinlock_t completion_lock;/* protects next_completion */
262 rbd_img_callback_t callback;
263 u64 xferred;/* aggregate bytes transferred */
264 int result; /* first nonzero obj_request result */
266 u32 obj_request_count;
267 struct list_head obj_requests; /* rbd_obj_request structs */
272 #define for_each_obj_request(ireq, oreq) \
273 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_from(ireq, oreq) \
275 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
276 #define for_each_obj_request_safe(ireq, oreq, n) \
277 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
289 int dev_id; /* blkdev unique id */
291 int major; /* blkdev assigned major */
292 struct gendisk *disk; /* blkdev's gendisk and rq */
294 u32 image_format; /* Either 1 or 2 */
295 struct rbd_client *rbd_client;
297 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
299 spinlock_t lock; /* queue, flags, open_count */
301 struct rbd_image_header header;
302 unsigned long flags; /* possibly lock protected */
303 struct rbd_spec *spec;
307 struct ceph_file_layout layout;
309 struct ceph_osd_event *watch_event;
310 struct rbd_obj_request *watch_request;
312 struct rbd_spec *parent_spec;
314 struct rbd_device *parent;
316 /* protects updating the header */
317 struct rw_semaphore header_rwsem;
319 struct rbd_mapping mapping;
321 struct list_head node;
325 unsigned long open_count; /* protected by lock */
329 * Flag bits for rbd_dev->flags. If atomicity is required,
330 * rbd_dev->lock is used to protect access.
332 * Currently, only the "removing" flag (which is coupled with the
333 * "open_count" field) requires atomic access.
336 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
337 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
340 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
342 static LIST_HEAD(rbd_dev_list); /* devices */
343 static DEFINE_SPINLOCK(rbd_dev_list_lock);
345 static LIST_HEAD(rbd_client_list); /* clients */
346 static DEFINE_SPINLOCK(rbd_client_list_lock);
348 /* Slab caches for frequently-allocated structures */
350 static struct kmem_cache *rbd_img_request_cache;
351 static struct kmem_cache *rbd_obj_request_cache;
352 static struct kmem_cache *rbd_segment_name_cache;
354 static int rbd_img_request_submit(struct rbd_img_request *img_request);
356 static void rbd_dev_device_release(struct device *dev);
358 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
360 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
362 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
364 static struct bus_attribute rbd_bus_attrs[] = {
365 __ATTR(add, S_IWUSR, NULL, rbd_add),
366 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
370 static struct bus_type rbd_bus_type = {
372 .bus_attrs = rbd_bus_attrs,
375 static void rbd_root_dev_release(struct device *dev)
379 static struct device rbd_root_dev = {
381 .release = rbd_root_dev_release,
384 static __printf(2, 3)
385 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
387 struct va_format vaf;
395 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
396 else if (rbd_dev->disk)
397 printk(KERN_WARNING "%s: %s: %pV\n",
398 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
399 else if (rbd_dev->spec && rbd_dev->spec->image_name)
400 printk(KERN_WARNING "%s: image %s: %pV\n",
401 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
402 else if (rbd_dev->spec && rbd_dev->spec->image_id)
403 printk(KERN_WARNING "%s: id %s: %pV\n",
404 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
406 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
407 RBD_DRV_NAME, rbd_dev, &vaf);
412 #define rbd_assert(expr) \
413 if (unlikely(!(expr))) { \
414 printk(KERN_ERR "\nAssertion failure in %s() " \
416 "\trbd_assert(%s);\n\n", \
417 __func__, __LINE__, #expr); \
420 #else /* !RBD_DEBUG */
421 # define rbd_assert(expr) ((void) 0)
422 #endif /* !RBD_DEBUG */
424 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
425 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
426 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
428 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
429 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
430 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
432 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
433 u8 *order, u64 *snap_size);
434 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
436 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
438 static int rbd_open(struct block_device *bdev, fmode_t mode)
440 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
441 bool removing = false;
443 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
446 spin_lock_irq(&rbd_dev->lock);
447 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
450 rbd_dev->open_count++;
451 spin_unlock_irq(&rbd_dev->lock);
455 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
456 (void) get_device(&rbd_dev->dev);
457 set_device_ro(bdev, rbd_dev->mapping.read_only);
458 mutex_unlock(&ctl_mutex);
463 static int rbd_release(struct gendisk *disk, fmode_t mode)
465 struct rbd_device *rbd_dev = disk->private_data;
466 unsigned long open_count_before;
468 spin_lock_irq(&rbd_dev->lock);
469 open_count_before = rbd_dev->open_count--;
470 spin_unlock_irq(&rbd_dev->lock);
471 rbd_assert(open_count_before > 0);
473 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
474 put_device(&rbd_dev->dev);
475 mutex_unlock(&ctl_mutex);
480 static const struct block_device_operations rbd_bd_ops = {
481 .owner = THIS_MODULE,
483 .release = rbd_release,
487 * Initialize an rbd client instance.
490 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
492 struct rbd_client *rbdc;
495 dout("%s:\n", __func__);
496 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
500 kref_init(&rbdc->kref);
501 INIT_LIST_HEAD(&rbdc->node);
503 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
505 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
506 if (IS_ERR(rbdc->client))
508 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
510 ret = ceph_open_session(rbdc->client);
514 spin_lock(&rbd_client_list_lock);
515 list_add_tail(&rbdc->node, &rbd_client_list);
516 spin_unlock(&rbd_client_list_lock);
518 mutex_unlock(&ctl_mutex);
519 dout("%s: rbdc %p\n", __func__, rbdc);
524 ceph_destroy_client(rbdc->client);
526 mutex_unlock(&ctl_mutex);
530 ceph_destroy_options(ceph_opts);
531 dout("%s: error %d\n", __func__, ret);
536 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
538 kref_get(&rbdc->kref);
544 * Find a ceph client with specific addr and configuration. If
545 * found, bump its reference count.
547 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
549 struct rbd_client *client_node;
552 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
555 spin_lock(&rbd_client_list_lock);
556 list_for_each_entry(client_node, &rbd_client_list, node) {
557 if (!ceph_compare_options(ceph_opts, client_node->client)) {
558 __rbd_get_client(client_node);
564 spin_unlock(&rbd_client_list_lock);
566 return found ? client_node : NULL;
576 /* string args above */
579 /* Boolean args above */
583 static match_table_t rbd_opts_tokens = {
585 /* string args above */
586 {Opt_read_only, "read_only"},
587 {Opt_read_only, "ro"}, /* Alternate spelling */
588 {Opt_read_write, "read_write"},
589 {Opt_read_write, "rw"}, /* Alternate spelling */
590 /* Boolean args above */
598 #define RBD_READ_ONLY_DEFAULT false
600 static int parse_rbd_opts_token(char *c, void *private)
602 struct rbd_options *rbd_opts = private;
603 substring_t argstr[MAX_OPT_ARGS];
604 int token, intval, ret;
606 token = match_token(c, rbd_opts_tokens, argstr);
610 if (token < Opt_last_int) {
611 ret = match_int(&argstr[0], &intval);
613 pr_err("bad mount option arg (not int) "
617 dout("got int token %d val %d\n", token, intval);
618 } else if (token > Opt_last_int && token < Opt_last_string) {
619 dout("got string token %d val %s\n", token,
621 } else if (token > Opt_last_string && token < Opt_last_bool) {
622 dout("got Boolean token %d\n", token);
624 dout("got token %d\n", token);
629 rbd_opts->read_only = true;
632 rbd_opts->read_only = false;
642 * Get a ceph client with specific addr and configuration, if one does
643 * not exist create it.
645 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
647 struct rbd_client *rbdc;
649 rbdc = rbd_client_find(ceph_opts);
650 if (rbdc) /* using an existing client */
651 ceph_destroy_options(ceph_opts);
653 rbdc = rbd_client_create(ceph_opts);
659 * Destroy ceph client
661 * Caller must hold rbd_client_list_lock.
663 static void rbd_client_release(struct kref *kref)
665 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
667 dout("%s: rbdc %p\n", __func__, rbdc);
668 spin_lock(&rbd_client_list_lock);
669 list_del(&rbdc->node);
670 spin_unlock(&rbd_client_list_lock);
672 ceph_destroy_client(rbdc->client);
677 * Drop reference to ceph client node. If it's not referenced anymore, release
680 static void rbd_put_client(struct rbd_client *rbdc)
683 kref_put(&rbdc->kref, rbd_client_release);
686 static bool rbd_image_format_valid(u32 image_format)
688 return image_format == 1 || image_format == 2;
691 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
696 /* The header has to start with the magic rbd header text */
697 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
700 /* The bio layer requires at least sector-sized I/O */
702 if (ondisk->options.order < SECTOR_SHIFT)
705 /* If we use u64 in a few spots we may be able to loosen this */
707 if (ondisk->options.order > 8 * sizeof (int) - 1)
711 * The size of a snapshot header has to fit in a size_t, and
712 * that limits the number of snapshots.
714 snap_count = le32_to_cpu(ondisk->snap_count);
715 size = SIZE_MAX - sizeof (struct ceph_snap_context);
716 if (snap_count > size / sizeof (__le64))
720 * Not only that, but the size of the entire the snapshot
721 * header must also be representable in a size_t.
723 size -= snap_count * sizeof (__le64);
724 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
731 * Create a new header structure, translate header format from the on-disk
734 static int rbd_header_from_disk(struct rbd_image_header *header,
735 struct rbd_image_header_ondisk *ondisk)
742 memset(header, 0, sizeof (*header));
744 snap_count = le32_to_cpu(ondisk->snap_count);
746 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
747 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
748 if (!header->object_prefix)
750 memcpy(header->object_prefix, ondisk->object_prefix, len);
751 header->object_prefix[len] = '\0';
754 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
756 /* Save a copy of the snapshot names */
758 if (snap_names_len > (u64) SIZE_MAX)
760 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
761 if (!header->snap_names)
764 * Note that rbd_dev_v1_header_read() guarantees
765 * the ondisk buffer we're working with has
766 * snap_names_len bytes beyond the end of the
767 * snapshot id array, this memcpy() is safe.
769 memcpy(header->snap_names, &ondisk->snaps[snap_count],
772 /* Record each snapshot's size */
774 size = snap_count * sizeof (*header->snap_sizes);
775 header->snap_sizes = kmalloc(size, GFP_KERNEL);
776 if (!header->snap_sizes)
778 for (i = 0; i < snap_count; i++)
779 header->snap_sizes[i] =
780 le64_to_cpu(ondisk->snaps[i].image_size);
782 header->snap_names = NULL;
783 header->snap_sizes = NULL;
786 header->features = 0; /* No features support in v1 images */
787 header->obj_order = ondisk->options.order;
788 header->crypt_type = ondisk->options.crypt_type;
789 header->comp_type = ondisk->options.comp_type;
791 /* Allocate and fill in the snapshot context */
793 header->image_size = le64_to_cpu(ondisk->image_size);
795 header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
798 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
799 for (i = 0; i < snap_count; i++)
800 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
805 kfree(header->snap_sizes);
806 header->snap_sizes = NULL;
807 kfree(header->snap_names);
808 header->snap_names = NULL;
809 kfree(header->object_prefix);
810 header->object_prefix = NULL;
815 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
817 const char *snap_name;
819 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
821 /* Skip over names until we find the one we are looking for */
823 snap_name = rbd_dev->header.snap_names;
825 snap_name += strlen(snap_name) + 1;
827 return kstrdup(snap_name, GFP_KERNEL);
831 * Snapshot id comparison function for use with qsort()/bsearch().
832 * Note that result is for snapshots in *descending* order.
834 static int snapid_compare_reverse(const void *s1, const void *s2)
836 u64 snap_id1 = *(u64 *)s1;
837 u64 snap_id2 = *(u64 *)s2;
839 if (snap_id1 < snap_id2)
841 return snap_id1 == snap_id2 ? 0 : -1;
845 * Search a snapshot context to see if the given snapshot id is
848 * Returns the position of the snapshot id in the array if it's found,
849 * or BAD_SNAP_INDEX otherwise.
851 * Note: The snapshot array is in kept sorted (by the osd) in
852 * reverse order, highest snapshot id first.
854 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
856 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
859 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
860 sizeof (snap_id), snapid_compare_reverse);
862 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
865 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
870 which = rbd_dev_snap_index(rbd_dev, snap_id);
871 if (which == BAD_SNAP_INDEX)
874 return _rbd_dev_v1_snap_name(rbd_dev, which);
877 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
879 if (snap_id == CEPH_NOSNAP)
880 return RBD_SNAP_HEAD_NAME;
882 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
883 if (rbd_dev->image_format == 1)
884 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
886 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
889 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
892 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
893 if (snap_id == CEPH_NOSNAP) {
894 *snap_size = rbd_dev->header.image_size;
895 } else if (rbd_dev->image_format == 1) {
898 which = rbd_dev_snap_index(rbd_dev, snap_id);
899 if (which == BAD_SNAP_INDEX)
902 *snap_size = rbd_dev->header.snap_sizes[which];
907 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
916 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
919 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
920 if (snap_id == CEPH_NOSNAP) {
921 *snap_features = rbd_dev->header.features;
922 } else if (rbd_dev->image_format == 1) {
923 *snap_features = 0; /* No features for format 1 */
928 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
932 *snap_features = features;
937 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
939 u64 snap_id = rbd_dev->spec->snap_id;
944 ret = rbd_snap_size(rbd_dev, snap_id, &size);
947 ret = rbd_snap_features(rbd_dev, snap_id, &features);
951 rbd_dev->mapping.size = size;
952 rbd_dev->mapping.features = features;
954 /* If we are mapping a snapshot it must be marked read-only */
956 if (snap_id != CEPH_NOSNAP)
957 rbd_dev->mapping.read_only = true;
962 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
964 rbd_dev->mapping.size = 0;
965 rbd_dev->mapping.features = 0;
966 rbd_dev->mapping.read_only = true;
969 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
971 rbd_dev->mapping.size = 0;
972 rbd_dev->mapping.features = 0;
973 rbd_dev->mapping.read_only = true;
976 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
982 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
985 segment = offset >> rbd_dev->header.obj_order;
986 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
987 rbd_dev->header.object_prefix, segment);
988 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
989 pr_err("error formatting segment name for #%llu (%d)\n",
998 static void rbd_segment_name_free(const char *name)
1000 /* The explicit cast here is needed to drop the const qualifier */
1002 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1005 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1007 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1009 return offset & (segment_size - 1);
1012 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1013 u64 offset, u64 length)
1015 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1017 offset &= segment_size - 1;
1019 rbd_assert(length <= U64_MAX - offset);
1020 if (offset + length > segment_size)
1021 length = segment_size - offset;
1027 * returns the size of an object in the image
1029 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1031 return 1 << header->obj_order;
1038 static void bio_chain_put(struct bio *chain)
1044 chain = chain->bi_next;
1050 * zeros a bio chain, starting at specific offset
1052 static void zero_bio_chain(struct bio *chain, int start_ofs)
1055 unsigned long flags;
1061 bio_for_each_segment(bv, chain, i) {
1062 if (pos + bv->bv_len > start_ofs) {
1063 int remainder = max(start_ofs - pos, 0);
1064 buf = bvec_kmap_irq(bv, &flags);
1065 memset(buf + remainder, 0,
1066 bv->bv_len - remainder);
1067 bvec_kunmap_irq(buf, &flags);
1072 chain = chain->bi_next;
1077 * similar to zero_bio_chain(), zeros data defined by a page array,
1078 * starting at the given byte offset from the start of the array and
1079 * continuing up to the given end offset. The pages array is
1080 * assumed to be big enough to hold all bytes up to the end.
1082 static void zero_pages(struct page **pages, u64 offset, u64 end)
1084 struct page **page = &pages[offset >> PAGE_SHIFT];
1086 rbd_assert(end > offset);
1087 rbd_assert(end - offset <= (u64)SIZE_MAX);
1088 while (offset < end) {
1091 unsigned long flags;
1094 page_offset = (size_t)(offset & ~PAGE_MASK);
1095 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1096 local_irq_save(flags);
1097 kaddr = kmap_atomic(*page);
1098 memset(kaddr + page_offset, 0, length);
1099 kunmap_atomic(kaddr);
1100 local_irq_restore(flags);
1108 * Clone a portion of a bio, starting at the given byte offset
1109 * and continuing for the number of bytes indicated.
1111 static struct bio *bio_clone_range(struct bio *bio_src,
1112 unsigned int offset,
1120 unsigned short end_idx;
1121 unsigned short vcnt;
1124 /* Handle the easy case for the caller */
1126 if (!offset && len == bio_src->bi_size)
1127 return bio_clone(bio_src, gfpmask);
1129 if (WARN_ON_ONCE(!len))
1131 if (WARN_ON_ONCE(len > bio_src->bi_size))
1133 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1136 /* Find first affected segment... */
1139 __bio_for_each_segment(bv, bio_src, idx, 0) {
1140 if (resid < bv->bv_len)
1142 resid -= bv->bv_len;
1146 /* ...and the last affected segment */
1149 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1150 if (resid <= bv->bv_len)
1152 resid -= bv->bv_len;
1154 vcnt = end_idx - idx + 1;
1156 /* Build the clone */
1158 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1160 return NULL; /* ENOMEM */
1162 bio->bi_bdev = bio_src->bi_bdev;
1163 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1164 bio->bi_rw = bio_src->bi_rw;
1165 bio->bi_flags |= 1 << BIO_CLONED;
1168 * Copy over our part of the bio_vec, then update the first
1169 * and last (or only) entries.
1171 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1172 vcnt * sizeof (struct bio_vec));
1173 bio->bi_io_vec[0].bv_offset += voff;
1175 bio->bi_io_vec[0].bv_len -= voff;
1176 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1178 bio->bi_io_vec[0].bv_len = len;
1181 bio->bi_vcnt = vcnt;
1189 * Clone a portion of a bio chain, starting at the given byte offset
1190 * into the first bio in the source chain and continuing for the
1191 * number of bytes indicated. The result is another bio chain of
1192 * exactly the given length, or a null pointer on error.
1194 * The bio_src and offset parameters are both in-out. On entry they
1195 * refer to the first source bio and the offset into that bio where
1196 * the start of data to be cloned is located.
1198 * On return, bio_src is updated to refer to the bio in the source
1199 * chain that contains first un-cloned byte, and *offset will
1200 * contain the offset of that byte within that bio.
1202 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1203 unsigned int *offset,
1207 struct bio *bi = *bio_src;
1208 unsigned int off = *offset;
1209 struct bio *chain = NULL;
1212 /* Build up a chain of clone bios up to the limit */
1214 if (!bi || off >= bi->bi_size || !len)
1215 return NULL; /* Nothing to clone */
1219 unsigned int bi_size;
1223 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1224 goto out_err; /* EINVAL; ran out of bio's */
1226 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1227 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1229 goto out_err; /* ENOMEM */
1232 end = &bio->bi_next;
1235 if (off == bi->bi_size) {
1246 bio_chain_put(chain);
1252 * The default/initial value for all object request flags is 0. For
1253 * each flag, once its value is set to 1 it is never reset to 0
1256 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1258 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1259 struct rbd_device *rbd_dev;
1261 rbd_dev = obj_request->img_request->rbd_dev;
1262 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1267 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1270 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1273 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1275 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1276 struct rbd_device *rbd_dev = NULL;
1278 if (obj_request_img_data_test(obj_request))
1279 rbd_dev = obj_request->img_request->rbd_dev;
1280 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1285 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1288 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1292 * This sets the KNOWN flag after (possibly) setting the EXISTS
1293 * flag. The latter is set based on the "exists" value provided.
1295 * Note that for our purposes once an object exists it never goes
1296 * away again. It's possible that the response from two existence
1297 * checks are separated by the creation of the target object, and
1298 * the first ("doesn't exist") response arrives *after* the second
1299 * ("does exist"). In that case we ignore the second one.
1301 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1305 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1306 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1310 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1313 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1316 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1319 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1322 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1324 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1325 atomic_read(&obj_request->kref.refcount));
1326 kref_get(&obj_request->kref);
1329 static void rbd_obj_request_destroy(struct kref *kref);
1330 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1332 rbd_assert(obj_request != NULL);
1333 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1334 atomic_read(&obj_request->kref.refcount));
1335 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1338 static void rbd_img_request_get(struct rbd_img_request *img_request)
1340 dout("%s: img %p (was %d)\n", __func__, img_request,
1341 atomic_read(&img_request->kref.refcount));
1342 kref_get(&img_request->kref);
1345 static void rbd_img_request_destroy(struct kref *kref);
1346 static void rbd_img_request_put(struct rbd_img_request *img_request)
1348 rbd_assert(img_request != NULL);
1349 dout("%s: img %p (was %d)\n", __func__, img_request,
1350 atomic_read(&img_request->kref.refcount));
1351 kref_put(&img_request->kref, rbd_img_request_destroy);
1354 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1355 struct rbd_obj_request *obj_request)
1357 rbd_assert(obj_request->img_request == NULL);
1359 /* Image request now owns object's original reference */
1360 obj_request->img_request = img_request;
1361 obj_request->which = img_request->obj_request_count;
1362 rbd_assert(!obj_request_img_data_test(obj_request));
1363 obj_request_img_data_set(obj_request);
1364 rbd_assert(obj_request->which != BAD_WHICH);
1365 img_request->obj_request_count++;
1366 list_add_tail(&obj_request->links, &img_request->obj_requests);
1367 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1368 obj_request->which);
1371 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1372 struct rbd_obj_request *obj_request)
1374 rbd_assert(obj_request->which != BAD_WHICH);
1376 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1377 obj_request->which);
1378 list_del(&obj_request->links);
1379 rbd_assert(img_request->obj_request_count > 0);
1380 img_request->obj_request_count--;
1381 rbd_assert(obj_request->which == img_request->obj_request_count);
1382 obj_request->which = BAD_WHICH;
1383 rbd_assert(obj_request_img_data_test(obj_request));
1384 rbd_assert(obj_request->img_request == img_request);
1385 obj_request->img_request = NULL;
1386 obj_request->callback = NULL;
1387 rbd_obj_request_put(obj_request);
1390 static bool obj_request_type_valid(enum obj_request_type type)
1393 case OBJ_REQUEST_NODATA:
1394 case OBJ_REQUEST_BIO:
1395 case OBJ_REQUEST_PAGES:
1402 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1403 struct rbd_obj_request *obj_request)
1405 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1407 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1410 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1413 dout("%s: img %p\n", __func__, img_request);
1416 * If no error occurred, compute the aggregate transfer
1417 * count for the image request. We could instead use
1418 * atomic64_cmpxchg() to update it as each object request
1419 * completes; not clear which way is better off hand.
1421 if (!img_request->result) {
1422 struct rbd_obj_request *obj_request;
1425 for_each_obj_request(img_request, obj_request)
1426 xferred += obj_request->xferred;
1427 img_request->xferred = xferred;
1430 if (img_request->callback)
1431 img_request->callback(img_request);
1433 rbd_img_request_put(img_request);
1436 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1438 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1440 dout("%s: obj %p\n", __func__, obj_request);
1442 return wait_for_completion_interruptible(&obj_request->completion);
1446 * The default/initial value for all image request flags is 0. Each
1447 * is conditionally set to 1 at image request initialization time
1448 * and currently never change thereafter.
1450 static void img_request_write_set(struct rbd_img_request *img_request)
1452 set_bit(IMG_REQ_WRITE, &img_request->flags);
1456 static bool img_request_write_test(struct rbd_img_request *img_request)
1459 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1462 static void img_request_child_set(struct rbd_img_request *img_request)
1464 set_bit(IMG_REQ_CHILD, &img_request->flags);
1468 static bool img_request_child_test(struct rbd_img_request *img_request)
1471 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1474 static void img_request_layered_set(struct rbd_img_request *img_request)
1476 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1480 static bool img_request_layered_test(struct rbd_img_request *img_request)
1483 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1487 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1489 u64 xferred = obj_request->xferred;
1490 u64 length = obj_request->length;
1492 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1493 obj_request, obj_request->img_request, obj_request->result,
1496 * ENOENT means a hole in the image. We zero-fill the
1497 * entire length of the request. A short read also implies
1498 * zero-fill to the end of the request. Either way we
1499 * update the xferred count to indicate the whole request
1502 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1503 if (obj_request->result == -ENOENT) {
1504 if (obj_request->type == OBJ_REQUEST_BIO)
1505 zero_bio_chain(obj_request->bio_list, 0);
1507 zero_pages(obj_request->pages, 0, length);
1508 obj_request->result = 0;
1509 obj_request->xferred = length;
1510 } else if (xferred < length && !obj_request->result) {
1511 if (obj_request->type == OBJ_REQUEST_BIO)
1512 zero_bio_chain(obj_request->bio_list, xferred);
1514 zero_pages(obj_request->pages, xferred, length);
1515 obj_request->xferred = length;
1517 obj_request_done_set(obj_request);
1520 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1522 dout("%s: obj %p cb %p\n", __func__, obj_request,
1523 obj_request->callback);
1524 if (obj_request->callback)
1525 obj_request->callback(obj_request);
1527 complete_all(&obj_request->completion);
1530 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1532 dout("%s: obj %p\n", __func__, obj_request);
1533 obj_request_done_set(obj_request);
1536 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1538 struct rbd_img_request *img_request = NULL;
1539 struct rbd_device *rbd_dev = NULL;
1540 bool layered = false;
1542 if (obj_request_img_data_test(obj_request)) {
1543 img_request = obj_request->img_request;
1544 layered = img_request && img_request_layered_test(img_request);
1545 rbd_dev = img_request->rbd_dev;
1548 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1549 obj_request, img_request, obj_request->result,
1550 obj_request->xferred, obj_request->length);
1551 if (layered && obj_request->result == -ENOENT &&
1552 obj_request->img_offset < rbd_dev->parent_overlap)
1553 rbd_img_parent_read(obj_request);
1554 else if (img_request)
1555 rbd_img_obj_request_read_callback(obj_request);
1557 obj_request_done_set(obj_request);
1560 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1562 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1563 obj_request->result, obj_request->length);
1565 * There is no such thing as a successful short write. Set
1566 * it to our originally-requested length.
1568 obj_request->xferred = obj_request->length;
1569 obj_request_done_set(obj_request);
1573 * For a simple stat call there's nothing to do. We'll do more if
1574 * this is part of a write sequence for a layered image.
1576 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1578 dout("%s: obj %p\n", __func__, obj_request);
1579 obj_request_done_set(obj_request);
1582 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1583 struct ceph_msg *msg)
1585 struct rbd_obj_request *obj_request = osd_req->r_priv;
1588 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1589 rbd_assert(osd_req == obj_request->osd_req);
1590 if (obj_request_img_data_test(obj_request)) {
1591 rbd_assert(obj_request->img_request);
1592 rbd_assert(obj_request->which != BAD_WHICH);
1594 rbd_assert(obj_request->which == BAD_WHICH);
1597 if (osd_req->r_result < 0)
1598 obj_request->result = osd_req->r_result;
1600 BUG_ON(osd_req->r_num_ops > 2);
1603 * We support a 64-bit length, but ultimately it has to be
1604 * passed to blk_end_request(), which takes an unsigned int.
1606 obj_request->xferred = osd_req->r_reply_op_len[0];
1607 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1608 opcode = osd_req->r_ops[0].op;
1610 case CEPH_OSD_OP_READ:
1611 rbd_osd_read_callback(obj_request);
1613 case CEPH_OSD_OP_WRITE:
1614 rbd_osd_write_callback(obj_request);
1616 case CEPH_OSD_OP_STAT:
1617 rbd_osd_stat_callback(obj_request);
1619 case CEPH_OSD_OP_CALL:
1620 case CEPH_OSD_OP_NOTIFY_ACK:
1621 case CEPH_OSD_OP_WATCH:
1622 rbd_osd_trivial_callback(obj_request);
1625 rbd_warn(NULL, "%s: unsupported op %hu\n",
1626 obj_request->object_name, (unsigned short) opcode);
1630 if (obj_request_done_test(obj_request))
1631 rbd_obj_request_complete(obj_request);
1634 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1636 struct rbd_img_request *img_request = obj_request->img_request;
1637 struct ceph_osd_request *osd_req = obj_request->osd_req;
1640 rbd_assert(osd_req != NULL);
1642 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1643 ceph_osdc_build_request(osd_req, obj_request->offset,
1644 NULL, snap_id, NULL);
1647 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1649 struct rbd_img_request *img_request = obj_request->img_request;
1650 struct ceph_osd_request *osd_req = obj_request->osd_req;
1651 struct ceph_snap_context *snapc;
1652 struct timespec mtime = CURRENT_TIME;
1654 rbd_assert(osd_req != NULL);
1656 snapc = img_request ? img_request->snapc : NULL;
1657 ceph_osdc_build_request(osd_req, obj_request->offset,
1658 snapc, CEPH_NOSNAP, &mtime);
1661 static struct ceph_osd_request *rbd_osd_req_create(
1662 struct rbd_device *rbd_dev,
1664 struct rbd_obj_request *obj_request)
1666 struct ceph_snap_context *snapc = NULL;
1667 struct ceph_osd_client *osdc;
1668 struct ceph_osd_request *osd_req;
1670 if (obj_request_img_data_test(obj_request)) {
1671 struct rbd_img_request *img_request = obj_request->img_request;
1673 rbd_assert(write_request ==
1674 img_request_write_test(img_request));
1676 snapc = img_request->snapc;
1679 /* Allocate and initialize the request, for the single op */
1681 osdc = &rbd_dev->rbd_client->client->osdc;
1682 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1684 return NULL; /* ENOMEM */
1687 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1689 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1691 osd_req->r_callback = rbd_osd_req_callback;
1692 osd_req->r_priv = obj_request;
1694 osd_req->r_oid_len = strlen(obj_request->object_name);
1695 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1696 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1698 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1704 * Create a copyup osd request based on the information in the
1705 * object request supplied. A copyup request has two osd ops,
1706 * a copyup method call, and a "normal" write request.
1708 static struct ceph_osd_request *
1709 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1711 struct rbd_img_request *img_request;
1712 struct ceph_snap_context *snapc;
1713 struct rbd_device *rbd_dev;
1714 struct ceph_osd_client *osdc;
1715 struct ceph_osd_request *osd_req;
1717 rbd_assert(obj_request_img_data_test(obj_request));
1718 img_request = obj_request->img_request;
1719 rbd_assert(img_request);
1720 rbd_assert(img_request_write_test(img_request));
1722 /* Allocate and initialize the request, for the two ops */
1724 snapc = img_request->snapc;
1725 rbd_dev = img_request->rbd_dev;
1726 osdc = &rbd_dev->rbd_client->client->osdc;
1727 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1729 return NULL; /* ENOMEM */
1731 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1732 osd_req->r_callback = rbd_osd_req_callback;
1733 osd_req->r_priv = obj_request;
1735 osd_req->r_oid_len = strlen(obj_request->object_name);
1736 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1737 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1739 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1745 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1747 ceph_osdc_put_request(osd_req);
1750 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1752 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1753 u64 offset, u64 length,
1754 enum obj_request_type type)
1756 struct rbd_obj_request *obj_request;
1760 rbd_assert(obj_request_type_valid(type));
1762 size = strlen(object_name) + 1;
1763 name = kmalloc(size, GFP_KERNEL);
1767 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1773 obj_request->object_name = memcpy(name, object_name, size);
1774 obj_request->offset = offset;
1775 obj_request->length = length;
1776 obj_request->flags = 0;
1777 obj_request->which = BAD_WHICH;
1778 obj_request->type = type;
1779 INIT_LIST_HEAD(&obj_request->links);
1780 init_completion(&obj_request->completion);
1781 kref_init(&obj_request->kref);
1783 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1784 offset, length, (int)type, obj_request);
1789 static void rbd_obj_request_destroy(struct kref *kref)
1791 struct rbd_obj_request *obj_request;
1793 obj_request = container_of(kref, struct rbd_obj_request, kref);
1795 dout("%s: obj %p\n", __func__, obj_request);
1797 rbd_assert(obj_request->img_request == NULL);
1798 rbd_assert(obj_request->which == BAD_WHICH);
1800 if (obj_request->osd_req)
1801 rbd_osd_req_destroy(obj_request->osd_req);
1803 rbd_assert(obj_request_type_valid(obj_request->type));
1804 switch (obj_request->type) {
1805 case OBJ_REQUEST_NODATA:
1806 break; /* Nothing to do */
1807 case OBJ_REQUEST_BIO:
1808 if (obj_request->bio_list)
1809 bio_chain_put(obj_request->bio_list);
1811 case OBJ_REQUEST_PAGES:
1812 if (obj_request->pages)
1813 ceph_release_page_vector(obj_request->pages,
1814 obj_request->page_count);
1818 kfree(obj_request->object_name);
1819 obj_request->object_name = NULL;
1820 kmem_cache_free(rbd_obj_request_cache, obj_request);
1824 * Caller is responsible for filling in the list of object requests
1825 * that comprises the image request, and the Linux request pointer
1826 * (if there is one).
1828 static struct rbd_img_request *rbd_img_request_create(
1829 struct rbd_device *rbd_dev,
1830 u64 offset, u64 length,
1834 struct rbd_img_request *img_request;
1836 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1840 if (write_request) {
1841 down_read(&rbd_dev->header_rwsem);
1842 ceph_get_snap_context(rbd_dev->header.snapc);
1843 up_read(&rbd_dev->header_rwsem);
1846 img_request->rq = NULL;
1847 img_request->rbd_dev = rbd_dev;
1848 img_request->offset = offset;
1849 img_request->length = length;
1850 img_request->flags = 0;
1851 if (write_request) {
1852 img_request_write_set(img_request);
1853 img_request->snapc = rbd_dev->header.snapc;
1855 img_request->snap_id = rbd_dev->spec->snap_id;
1858 img_request_child_set(img_request);
1859 if (rbd_dev->parent_spec)
1860 img_request_layered_set(img_request);
1861 spin_lock_init(&img_request->completion_lock);
1862 img_request->next_completion = 0;
1863 img_request->callback = NULL;
1864 img_request->result = 0;
1865 img_request->obj_request_count = 0;
1866 INIT_LIST_HEAD(&img_request->obj_requests);
1867 kref_init(&img_request->kref);
1869 rbd_img_request_get(img_request); /* Avoid a warning */
1870 rbd_img_request_put(img_request); /* TEMPORARY */
1872 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1873 write_request ? "write" : "read", offset, length,
1879 static void rbd_img_request_destroy(struct kref *kref)
1881 struct rbd_img_request *img_request;
1882 struct rbd_obj_request *obj_request;
1883 struct rbd_obj_request *next_obj_request;
1885 img_request = container_of(kref, struct rbd_img_request, kref);
1887 dout("%s: img %p\n", __func__, img_request);
1889 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1890 rbd_img_obj_request_del(img_request, obj_request);
1891 rbd_assert(img_request->obj_request_count == 0);
1893 if (img_request_write_test(img_request))
1894 ceph_put_snap_context(img_request->snapc);
1896 if (img_request_child_test(img_request))
1897 rbd_obj_request_put(img_request->obj_request);
1899 kmem_cache_free(rbd_img_request_cache, img_request);
1902 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1904 struct rbd_img_request *img_request;
1905 unsigned int xferred;
1909 rbd_assert(obj_request_img_data_test(obj_request));
1910 img_request = obj_request->img_request;
1912 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1913 xferred = (unsigned int)obj_request->xferred;
1914 result = obj_request->result;
1916 struct rbd_device *rbd_dev = img_request->rbd_dev;
1918 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1919 img_request_write_test(img_request) ? "write" : "read",
1920 obj_request->length, obj_request->img_offset,
1921 obj_request->offset);
1922 rbd_warn(rbd_dev, " result %d xferred %x\n",
1924 if (!img_request->result)
1925 img_request->result = result;
1928 /* Image object requests don't own their page array */
1930 if (obj_request->type == OBJ_REQUEST_PAGES) {
1931 obj_request->pages = NULL;
1932 obj_request->page_count = 0;
1935 if (img_request_child_test(img_request)) {
1936 rbd_assert(img_request->obj_request != NULL);
1937 more = obj_request->which < img_request->obj_request_count - 1;
1939 rbd_assert(img_request->rq != NULL);
1940 more = blk_end_request(img_request->rq, result, xferred);
1946 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1948 struct rbd_img_request *img_request;
1949 u32 which = obj_request->which;
1952 rbd_assert(obj_request_img_data_test(obj_request));
1953 img_request = obj_request->img_request;
1955 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1956 rbd_assert(img_request != NULL);
1957 rbd_assert(img_request->obj_request_count > 0);
1958 rbd_assert(which != BAD_WHICH);
1959 rbd_assert(which < img_request->obj_request_count);
1960 rbd_assert(which >= img_request->next_completion);
1962 spin_lock_irq(&img_request->completion_lock);
1963 if (which != img_request->next_completion)
1966 for_each_obj_request_from(img_request, obj_request) {
1968 rbd_assert(which < img_request->obj_request_count);
1970 if (!obj_request_done_test(obj_request))
1972 more = rbd_img_obj_end_request(obj_request);
1976 rbd_assert(more ^ (which == img_request->obj_request_count));
1977 img_request->next_completion = which;
1979 spin_unlock_irq(&img_request->completion_lock);
1982 rbd_img_request_complete(img_request);
1986 * Split up an image request into one or more object requests, each
1987 * to a different object. The "type" parameter indicates whether
1988 * "data_desc" is the pointer to the head of a list of bio
1989 * structures, or the base of a page array. In either case this
1990 * function assumes data_desc describes memory sufficient to hold
1991 * all data described by the image request.
1993 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1994 enum obj_request_type type,
1997 struct rbd_device *rbd_dev = img_request->rbd_dev;
1998 struct rbd_obj_request *obj_request = NULL;
1999 struct rbd_obj_request *next_obj_request;
2000 bool write_request = img_request_write_test(img_request);
2001 struct bio *bio_list;
2002 unsigned int bio_offset = 0;
2003 struct page **pages;
2008 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2009 (int)type, data_desc);
2011 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2012 img_offset = img_request->offset;
2013 resid = img_request->length;
2014 rbd_assert(resid > 0);
2016 if (type == OBJ_REQUEST_BIO) {
2017 bio_list = data_desc;
2018 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2020 rbd_assert(type == OBJ_REQUEST_PAGES);
2025 struct ceph_osd_request *osd_req;
2026 const char *object_name;
2030 object_name = rbd_segment_name(rbd_dev, img_offset);
2033 offset = rbd_segment_offset(rbd_dev, img_offset);
2034 length = rbd_segment_length(rbd_dev, img_offset, resid);
2035 obj_request = rbd_obj_request_create(object_name,
2036 offset, length, type);
2037 /* object request has its own copy of the object name */
2038 rbd_segment_name_free(object_name);
2042 if (type == OBJ_REQUEST_BIO) {
2043 unsigned int clone_size;
2045 rbd_assert(length <= (u64)UINT_MAX);
2046 clone_size = (unsigned int)length;
2047 obj_request->bio_list =
2048 bio_chain_clone_range(&bio_list,
2052 if (!obj_request->bio_list)
2055 unsigned int page_count;
2057 obj_request->pages = pages;
2058 page_count = (u32)calc_pages_for(offset, length);
2059 obj_request->page_count = page_count;
2060 if ((offset + length) & ~PAGE_MASK)
2061 page_count--; /* more on last page */
2062 pages += page_count;
2065 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2069 obj_request->osd_req = osd_req;
2070 obj_request->callback = rbd_img_obj_callback;
2072 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2074 if (type == OBJ_REQUEST_BIO)
2075 osd_req_op_extent_osd_data_bio(osd_req, 0,
2076 obj_request->bio_list, length);
2078 osd_req_op_extent_osd_data_pages(osd_req, 0,
2079 obj_request->pages, length,
2080 offset & ~PAGE_MASK, false, false);
2083 rbd_osd_req_format_write(obj_request);
2085 rbd_osd_req_format_read(obj_request);
2087 obj_request->img_offset = img_offset;
2088 rbd_img_obj_request_add(img_request, obj_request);
2090 img_offset += length;
2097 rbd_obj_request_put(obj_request);
2099 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2100 rbd_obj_request_put(obj_request);
2106 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2108 struct rbd_img_request *img_request;
2109 struct rbd_device *rbd_dev;
2113 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2114 rbd_assert(obj_request_img_data_test(obj_request));
2115 img_request = obj_request->img_request;
2116 rbd_assert(img_request);
2118 rbd_dev = img_request->rbd_dev;
2119 rbd_assert(rbd_dev);
2120 length = (u64)1 << rbd_dev->header.obj_order;
2121 page_count = (u32)calc_pages_for(0, length);
2123 rbd_assert(obj_request->copyup_pages);
2124 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2125 obj_request->copyup_pages = NULL;
2128 * We want the transfer count to reflect the size of the
2129 * original write request. There is no such thing as a
2130 * successful short write, so if the request was successful
2131 * we can just set it to the originally-requested length.
2133 if (!obj_request->result)
2134 obj_request->xferred = obj_request->length;
2136 /* Finish up with the normal image object callback */
2138 rbd_img_obj_callback(obj_request);
2142 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2144 struct rbd_obj_request *orig_request;
2145 struct ceph_osd_request *osd_req;
2146 struct ceph_osd_client *osdc;
2147 struct rbd_device *rbd_dev;
2148 struct page **pages;
2153 rbd_assert(img_request_child_test(img_request));
2155 /* First get what we need from the image request */
2157 pages = img_request->copyup_pages;
2158 rbd_assert(pages != NULL);
2159 img_request->copyup_pages = NULL;
2161 orig_request = img_request->obj_request;
2162 rbd_assert(orig_request != NULL);
2163 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2164 result = img_request->result;
2165 obj_size = img_request->length;
2166 xferred = img_request->xferred;
2168 rbd_dev = img_request->rbd_dev;
2169 rbd_assert(rbd_dev);
2170 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2172 rbd_img_request_put(img_request);
2177 /* Allocate the new copyup osd request for the original request */
2180 rbd_assert(!orig_request->osd_req);
2181 osd_req = rbd_osd_req_create_copyup(orig_request);
2184 orig_request->osd_req = osd_req;
2185 orig_request->copyup_pages = pages;
2187 /* Initialize the copyup op */
2189 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2190 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2193 /* Then the original write request op */
2195 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2196 orig_request->offset,
2197 orig_request->length, 0, 0);
2198 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2199 orig_request->length);
2201 rbd_osd_req_format_write(orig_request);
2203 /* All set, send it off. */
2205 orig_request->callback = rbd_img_obj_copyup_callback;
2206 osdc = &rbd_dev->rbd_client->client->osdc;
2207 result = rbd_obj_request_submit(osdc, orig_request);
2211 /* Record the error code and complete the request */
2213 orig_request->result = result;
2214 orig_request->xferred = 0;
2215 obj_request_done_set(orig_request);
2216 rbd_obj_request_complete(orig_request);
2220 * Read from the parent image the range of data that covers the
2221 * entire target of the given object request. This is used for
2222 * satisfying a layered image write request when the target of an
2223 * object request from the image request does not exist.
2225 * A page array big enough to hold the returned data is allocated
2226 * and supplied to rbd_img_request_fill() as the "data descriptor."
2227 * When the read completes, this page array will be transferred to
2228 * the original object request for the copyup operation.
2230 * If an error occurs, record it as the result of the original
2231 * object request and mark it done so it gets completed.
2233 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2235 struct rbd_img_request *img_request = NULL;
2236 struct rbd_img_request *parent_request = NULL;
2237 struct rbd_device *rbd_dev;
2240 struct page **pages = NULL;
2244 rbd_assert(obj_request_img_data_test(obj_request));
2245 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2247 img_request = obj_request->img_request;
2248 rbd_assert(img_request != NULL);
2249 rbd_dev = img_request->rbd_dev;
2250 rbd_assert(rbd_dev->parent != NULL);
2253 * First things first. The original osd request is of no
2254 * use to use any more, we'll need a new one that can hold
2255 * the two ops in a copyup request. We'll get that later,
2256 * but for now we can release the old one.
2258 rbd_osd_req_destroy(obj_request->osd_req);
2259 obj_request->osd_req = NULL;
2262 * Determine the byte range covered by the object in the
2263 * child image to which the original request was to be sent.
2265 img_offset = obj_request->img_offset - obj_request->offset;
2266 length = (u64)1 << rbd_dev->header.obj_order;
2269 * There is no defined parent data beyond the parent
2270 * overlap, so limit what we read at that boundary if
2273 if (img_offset + length > rbd_dev->parent_overlap) {
2274 rbd_assert(img_offset < rbd_dev->parent_overlap);
2275 length = rbd_dev->parent_overlap - img_offset;
2279 * Allocate a page array big enough to receive the data read
2282 page_count = (u32)calc_pages_for(0, length);
2283 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2284 if (IS_ERR(pages)) {
2285 result = PTR_ERR(pages);
2291 parent_request = rbd_img_request_create(rbd_dev->parent,
2294 if (!parent_request)
2296 rbd_obj_request_get(obj_request);
2297 parent_request->obj_request = obj_request;
2299 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2302 parent_request->copyup_pages = pages;
2304 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2305 result = rbd_img_request_submit(parent_request);
2309 parent_request->copyup_pages = NULL;
2310 parent_request->obj_request = NULL;
2311 rbd_obj_request_put(obj_request);
2314 ceph_release_page_vector(pages, page_count);
2316 rbd_img_request_put(parent_request);
2317 obj_request->result = result;
2318 obj_request->xferred = 0;
2319 obj_request_done_set(obj_request);
2324 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2326 struct rbd_obj_request *orig_request;
2329 rbd_assert(!obj_request_img_data_test(obj_request));
2332 * All we need from the object request is the original
2333 * request and the result of the STAT op. Grab those, then
2334 * we're done with the request.
2336 orig_request = obj_request->obj_request;
2337 obj_request->obj_request = NULL;
2338 rbd_assert(orig_request);
2339 rbd_assert(orig_request->img_request);
2341 result = obj_request->result;
2342 obj_request->result = 0;
2344 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2345 obj_request, orig_request, result,
2346 obj_request->xferred, obj_request->length);
2347 rbd_obj_request_put(obj_request);
2349 rbd_assert(orig_request);
2350 rbd_assert(orig_request->img_request);
2353 * Our only purpose here is to determine whether the object
2354 * exists, and we don't want to treat the non-existence as
2355 * an error. If something else comes back, transfer the
2356 * error to the original request and complete it now.
2359 obj_request_existence_set(orig_request, true);
2360 } else if (result == -ENOENT) {
2361 obj_request_existence_set(orig_request, false);
2362 } else if (result) {
2363 orig_request->result = result;
2368 * Resubmit the original request now that we have recorded
2369 * whether the target object exists.
2371 orig_request->result = rbd_img_obj_request_submit(orig_request);
2373 if (orig_request->result)
2374 rbd_obj_request_complete(orig_request);
2375 rbd_obj_request_put(orig_request);
2378 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2380 struct rbd_obj_request *stat_request;
2381 struct rbd_device *rbd_dev;
2382 struct ceph_osd_client *osdc;
2383 struct page **pages = NULL;
2389 * The response data for a STAT call consists of:
2396 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2397 page_count = (u32)calc_pages_for(0, size);
2398 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2400 return PTR_ERR(pages);
2403 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2408 rbd_obj_request_get(obj_request);
2409 stat_request->obj_request = obj_request;
2410 stat_request->pages = pages;
2411 stat_request->page_count = page_count;
2413 rbd_assert(obj_request->img_request);
2414 rbd_dev = obj_request->img_request->rbd_dev;
2415 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2417 if (!stat_request->osd_req)
2419 stat_request->callback = rbd_img_obj_exists_callback;
2421 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2422 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2424 rbd_osd_req_format_read(stat_request);
2426 osdc = &rbd_dev->rbd_client->client->osdc;
2427 ret = rbd_obj_request_submit(osdc, stat_request);
2430 rbd_obj_request_put(obj_request);
2435 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2437 struct rbd_img_request *img_request;
2438 struct rbd_device *rbd_dev;
2441 rbd_assert(obj_request_img_data_test(obj_request));
2443 img_request = obj_request->img_request;
2444 rbd_assert(img_request);
2445 rbd_dev = img_request->rbd_dev;
2448 * Only writes to layered images need special handling.
2449 * Reads and non-layered writes are simple object requests.
2450 * Layered writes that start beyond the end of the overlap
2451 * with the parent have no parent data, so they too are
2452 * simple object requests. Finally, if the target object is
2453 * known to already exist, its parent data has already been
2454 * copied, so a write to the object can also be handled as a
2455 * simple object request.
2457 if (!img_request_write_test(img_request) ||
2458 !img_request_layered_test(img_request) ||
2459 rbd_dev->parent_overlap <= obj_request->img_offset ||
2460 ((known = obj_request_known_test(obj_request)) &&
2461 obj_request_exists_test(obj_request))) {
2463 struct rbd_device *rbd_dev;
2464 struct ceph_osd_client *osdc;
2466 rbd_dev = obj_request->img_request->rbd_dev;
2467 osdc = &rbd_dev->rbd_client->client->osdc;
2469 return rbd_obj_request_submit(osdc, obj_request);
2473 * It's a layered write. The target object might exist but
2474 * we may not know that yet. If we know it doesn't exist,
2475 * start by reading the data for the full target object from
2476 * the parent so we can use it for a copyup to the target.
2479 return rbd_img_obj_parent_read_full(obj_request);
2481 /* We don't know whether the target exists. Go find out. */
2483 return rbd_img_obj_exists_submit(obj_request);
2486 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2488 struct rbd_obj_request *obj_request;
2489 struct rbd_obj_request *next_obj_request;
2491 dout("%s: img %p\n", __func__, img_request);
2492 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2495 ret = rbd_img_obj_request_submit(obj_request);
2503 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2505 struct rbd_obj_request *obj_request;
2506 struct rbd_device *rbd_dev;
2509 rbd_assert(img_request_child_test(img_request));
2511 obj_request = img_request->obj_request;
2512 rbd_assert(obj_request);
2513 rbd_assert(obj_request->img_request);
2515 obj_request->result = img_request->result;
2516 if (obj_request->result)
2520 * We need to zero anything beyond the parent overlap
2521 * boundary. Since rbd_img_obj_request_read_callback()
2522 * will zero anything beyond the end of a short read, an
2523 * easy way to do this is to pretend the data from the
2524 * parent came up short--ending at the overlap boundary.
2526 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2527 obj_end = obj_request->img_offset + obj_request->length;
2528 rbd_dev = obj_request->img_request->rbd_dev;
2529 if (obj_end > rbd_dev->parent_overlap) {
2532 if (obj_request->img_offset < rbd_dev->parent_overlap)
2533 xferred = rbd_dev->parent_overlap -
2534 obj_request->img_offset;
2536 obj_request->xferred = min(img_request->xferred, xferred);
2538 obj_request->xferred = img_request->xferred;
2541 rbd_img_request_put(img_request);
2542 rbd_img_obj_request_read_callback(obj_request);
2543 rbd_obj_request_complete(obj_request);
2546 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2548 struct rbd_device *rbd_dev;
2549 struct rbd_img_request *img_request;
2552 rbd_assert(obj_request_img_data_test(obj_request));
2553 rbd_assert(obj_request->img_request != NULL);
2554 rbd_assert(obj_request->result == (s32) -ENOENT);
2555 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2557 rbd_dev = obj_request->img_request->rbd_dev;
2558 rbd_assert(rbd_dev->parent != NULL);
2559 /* rbd_read_finish(obj_request, obj_request->length); */
2560 img_request = rbd_img_request_create(rbd_dev->parent,
2561 obj_request->img_offset,
2562 obj_request->length,
2568 rbd_obj_request_get(obj_request);
2569 img_request->obj_request = obj_request;
2571 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2572 obj_request->bio_list);
2576 img_request->callback = rbd_img_parent_read_callback;
2577 result = rbd_img_request_submit(img_request);
2584 rbd_img_request_put(img_request);
2585 obj_request->result = result;
2586 obj_request->xferred = 0;
2587 obj_request_done_set(obj_request);
2590 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2592 struct rbd_obj_request *obj_request;
2593 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2596 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2597 OBJ_REQUEST_NODATA);
2602 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2603 if (!obj_request->osd_req)
2605 obj_request->callback = rbd_obj_request_put;
2607 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2609 rbd_osd_req_format_read(obj_request);
2611 ret = rbd_obj_request_submit(osdc, obj_request);
2614 rbd_obj_request_put(obj_request);
2619 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2621 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2627 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2628 rbd_dev->header_name, (unsigned long long)notify_id,
2629 (unsigned int)opcode);
2630 ret = rbd_dev_refresh(rbd_dev);
2632 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2634 rbd_obj_notify_ack(rbd_dev, notify_id);
2638 * Request sync osd watch/unwatch. The value of "start" determines
2639 * whether a watch request is being initiated or torn down.
2641 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2643 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2644 struct rbd_obj_request *obj_request;
2647 rbd_assert(start ^ !!rbd_dev->watch_event);
2648 rbd_assert(start ^ !!rbd_dev->watch_request);
2651 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2652 &rbd_dev->watch_event);
2655 rbd_assert(rbd_dev->watch_event != NULL);
2659 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2660 OBJ_REQUEST_NODATA);
2664 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2665 if (!obj_request->osd_req)
2669 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2671 ceph_osdc_unregister_linger_request(osdc,
2672 rbd_dev->watch_request->osd_req);
2674 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2675 rbd_dev->watch_event->cookie, 0, start);
2676 rbd_osd_req_format_write(obj_request);
2678 ret = rbd_obj_request_submit(osdc, obj_request);
2681 ret = rbd_obj_request_wait(obj_request);
2684 ret = obj_request->result;
2689 * A watch request is set to linger, so the underlying osd
2690 * request won't go away until we unregister it. We retain
2691 * a pointer to the object request during that time (in
2692 * rbd_dev->watch_request), so we'll keep a reference to
2693 * it. We'll drop that reference (below) after we've
2697 rbd_dev->watch_request = obj_request;
2702 /* We have successfully torn down the watch request */
2704 rbd_obj_request_put(rbd_dev->watch_request);
2705 rbd_dev->watch_request = NULL;
2707 /* Cancel the event if we're tearing down, or on error */
2708 ceph_osdc_cancel_event(rbd_dev->watch_event);
2709 rbd_dev->watch_event = NULL;
2711 rbd_obj_request_put(obj_request);
2717 * Synchronous osd object method call. Returns the number of bytes
2718 * returned in the outbound buffer, or a negative error code.
2720 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2721 const char *object_name,
2722 const char *class_name,
2723 const char *method_name,
2724 const void *outbound,
2725 size_t outbound_size,
2727 size_t inbound_size)
2729 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2730 struct rbd_obj_request *obj_request;
2731 struct page **pages;
2736 * Method calls are ultimately read operations. The result
2737 * should placed into the inbound buffer provided. They
2738 * also supply outbound data--parameters for the object
2739 * method. Currently if this is present it will be a
2742 page_count = (u32)calc_pages_for(0, inbound_size);
2743 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2745 return PTR_ERR(pages);
2748 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2753 obj_request->pages = pages;
2754 obj_request->page_count = page_count;
2756 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2757 if (!obj_request->osd_req)
2760 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2761 class_name, method_name);
2762 if (outbound_size) {
2763 struct ceph_pagelist *pagelist;
2765 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2769 ceph_pagelist_init(pagelist);
2770 ceph_pagelist_append(pagelist, outbound, outbound_size);
2771 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2774 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2775 obj_request->pages, inbound_size,
2777 rbd_osd_req_format_read(obj_request);
2779 ret = rbd_obj_request_submit(osdc, obj_request);
2782 ret = rbd_obj_request_wait(obj_request);
2786 ret = obj_request->result;
2790 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2791 ret = (int)obj_request->xferred;
2792 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2795 rbd_obj_request_put(obj_request);
2797 ceph_release_page_vector(pages, page_count);
2802 static void rbd_request_fn(struct request_queue *q)
2803 __releases(q->queue_lock) __acquires(q->queue_lock)
2805 struct rbd_device *rbd_dev = q->queuedata;
2806 bool read_only = rbd_dev->mapping.read_only;
2810 while ((rq = blk_fetch_request(q))) {
2811 bool write_request = rq_data_dir(rq) == WRITE;
2812 struct rbd_img_request *img_request;
2816 /* Ignore any non-FS requests that filter through. */
2818 if (rq->cmd_type != REQ_TYPE_FS) {
2819 dout("%s: non-fs request type %d\n", __func__,
2820 (int) rq->cmd_type);
2821 __blk_end_request_all(rq, 0);
2825 /* Ignore/skip any zero-length requests */
2827 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2828 length = (u64) blk_rq_bytes(rq);
2831 dout("%s: zero-length request\n", __func__);
2832 __blk_end_request_all(rq, 0);
2836 spin_unlock_irq(q->queue_lock);
2838 /* Disallow writes to a read-only device */
2840 if (write_request) {
2844 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2848 * Quit early if the mapped snapshot no longer
2849 * exists. It's still possible the snapshot will
2850 * have disappeared by the time our request arrives
2851 * at the osd, but there's no sense in sending it if
2854 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2855 dout("request for non-existent snapshot");
2856 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2862 if (offset && length > U64_MAX - offset + 1) {
2863 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2865 goto end_request; /* Shouldn't happen */
2869 if (offset + length > rbd_dev->mapping.size) {
2870 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2871 offset, length, rbd_dev->mapping.size);
2876 img_request = rbd_img_request_create(rbd_dev, offset, length,
2877 write_request, false);
2881 img_request->rq = rq;
2883 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2886 result = rbd_img_request_submit(img_request);
2888 rbd_img_request_put(img_request);
2890 spin_lock_irq(q->queue_lock);
2892 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2893 write_request ? "write" : "read",
2894 length, offset, result);
2896 __blk_end_request_all(rq, result);
2902 * a queue callback. Makes sure that we don't create a bio that spans across
2903 * multiple osd objects. One exception would be with a single page bios,
2904 * which we handle later at bio_chain_clone_range()
2906 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2907 struct bio_vec *bvec)
2909 struct rbd_device *rbd_dev = q->queuedata;
2910 sector_t sector_offset;
2911 sector_t sectors_per_obj;
2912 sector_t obj_sector_offset;
2916 * Find how far into its rbd object the partition-relative
2917 * bio start sector is to offset relative to the enclosing
2920 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2921 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2922 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2925 * Compute the number of bytes from that offset to the end
2926 * of the object. Account for what's already used by the bio.
2928 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2929 if (ret > bmd->bi_size)
2930 ret -= bmd->bi_size;
2935 * Don't send back more than was asked for. And if the bio
2936 * was empty, let the whole thing through because: "Note
2937 * that a block device *must* allow a single page to be
2938 * added to an empty bio."
2940 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2941 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2942 ret = (int) bvec->bv_len;
2947 static void rbd_free_disk(struct rbd_device *rbd_dev)
2949 struct gendisk *disk = rbd_dev->disk;
2954 rbd_dev->disk = NULL;
2955 if (disk->flags & GENHD_FL_UP) {
2958 blk_cleanup_queue(disk->queue);
2963 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2964 const char *object_name,
2965 u64 offset, u64 length, void *buf)
2968 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2969 struct rbd_obj_request *obj_request;
2970 struct page **pages = NULL;
2975 page_count = (u32) calc_pages_for(offset, length);
2976 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2978 ret = PTR_ERR(pages);
2981 obj_request = rbd_obj_request_create(object_name, offset, length,
2986 obj_request->pages = pages;
2987 obj_request->page_count = page_count;
2989 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2990 if (!obj_request->osd_req)
2993 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2994 offset, length, 0, 0);
2995 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2997 obj_request->length,
2998 obj_request->offset & ~PAGE_MASK,
3000 rbd_osd_req_format_read(obj_request);
3002 ret = rbd_obj_request_submit(osdc, obj_request);
3005 ret = rbd_obj_request_wait(obj_request);
3009 ret = obj_request->result;
3013 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3014 size = (size_t) obj_request->xferred;
3015 ceph_copy_from_page_vector(pages, buf, 0, size);
3016 rbd_assert(size <= (size_t)INT_MAX);
3020 rbd_obj_request_put(obj_request);
3022 ceph_release_page_vector(pages, page_count);
3028 * Read the complete header for the given rbd device.
3030 * Returns a pointer to a dynamically-allocated buffer containing
3031 * the complete and validated header. Caller can pass the address
3032 * of a variable that will be filled in with the version of the
3033 * header object at the time it was read.
3035 * Returns a pointer-coded errno if a failure occurs.
3037 static struct rbd_image_header_ondisk *
3038 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3040 struct rbd_image_header_ondisk *ondisk = NULL;
3047 * The complete header will include an array of its 64-bit
3048 * snapshot ids, followed by the names of those snapshots as
3049 * a contiguous block of NUL-terminated strings. Note that
3050 * the number of snapshots could change by the time we read
3051 * it in, in which case we re-read it.
3058 size = sizeof (*ondisk);
3059 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3061 ondisk = kmalloc(size, GFP_KERNEL);
3063 return ERR_PTR(-ENOMEM);
3065 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3069 if ((size_t)ret < size) {
3071 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3075 if (!rbd_dev_ondisk_valid(ondisk)) {
3077 rbd_warn(rbd_dev, "invalid header");
3081 names_size = le64_to_cpu(ondisk->snap_names_len);
3082 want_count = snap_count;
3083 snap_count = le32_to_cpu(ondisk->snap_count);
3084 } while (snap_count != want_count);
3091 return ERR_PTR(ret);
3095 * reload the ondisk the header
3097 static int rbd_read_header(struct rbd_device *rbd_dev,
3098 struct rbd_image_header *header)
3100 struct rbd_image_header_ondisk *ondisk;
3103 ondisk = rbd_dev_v1_header_read(rbd_dev);
3105 return PTR_ERR(ondisk);
3106 ret = rbd_header_from_disk(header, ondisk);
3113 * only read the first part of the ondisk header, without the snaps info
3115 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3118 struct rbd_image_header h;
3120 ret = rbd_read_header(rbd_dev, &h);
3124 down_write(&rbd_dev->header_rwsem);
3126 /* Update image size, and check for resize of mapped image */
3127 rbd_dev->header.image_size = h.image_size;
3128 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
3129 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3130 rbd_dev->mapping.size = rbd_dev->header.image_size;
3132 /* rbd_dev->header.object_prefix shouldn't change */
3133 kfree(rbd_dev->header.snap_sizes);
3134 kfree(rbd_dev->header.snap_names);
3135 /* osd requests may still refer to snapc */
3136 ceph_put_snap_context(rbd_dev->header.snapc);
3138 rbd_dev->header.image_size = h.image_size;
3139 rbd_dev->header.snapc = h.snapc;
3140 rbd_dev->header.snap_names = h.snap_names;
3141 rbd_dev->header.snap_sizes = h.snap_sizes;
3142 /* Free the extra copy of the object prefix */
3143 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3144 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3145 kfree(h.object_prefix);
3147 up_write(&rbd_dev->header_rwsem);
3153 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3154 * has disappeared from the (just updated) snapshot context.
3156 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3160 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3163 snap_id = rbd_dev->spec->snap_id;
3164 if (snap_id == CEPH_NOSNAP)
3167 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3168 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3171 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3176 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3177 mapping_size = rbd_dev->mapping.size;
3178 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3179 if (rbd_dev->image_format == 1)
3180 ret = rbd_dev_v1_refresh(rbd_dev);
3182 ret = rbd_dev_v2_refresh(rbd_dev);
3184 /* If it's a mapped snapshot, validate its EXISTS flag */
3186 rbd_exists_validate(rbd_dev);
3187 mutex_unlock(&ctl_mutex);
3188 if (mapping_size != rbd_dev->mapping.size) {
3191 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3192 dout("setting size to %llu sectors", (unsigned long long)size);
3193 set_capacity(rbd_dev->disk, size);
3194 revalidate_disk(rbd_dev->disk);
3200 static int rbd_init_disk(struct rbd_device *rbd_dev)
3202 struct gendisk *disk;
3203 struct request_queue *q;
3206 /* create gendisk info */
3207 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3211 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3213 disk->major = rbd_dev->major;
3214 disk->first_minor = 0;
3215 disk->fops = &rbd_bd_ops;
3216 disk->private_data = rbd_dev;
3218 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3222 /* We use the default size, but let's be explicit about it. */
3223 blk_queue_physical_block_size(q, SECTOR_SIZE);
3225 /* set io sizes to object size */
3226 segment_size = rbd_obj_bytes(&rbd_dev->header);
3227 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3228 blk_queue_max_segment_size(q, segment_size);
3229 blk_queue_io_min(q, segment_size);
3230 blk_queue_io_opt(q, segment_size);
3232 blk_queue_merge_bvec(q, rbd_merge_bvec);
3235 q->queuedata = rbd_dev;
3237 rbd_dev->disk = disk;
3250 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3252 return container_of(dev, struct rbd_device, dev);
3255 static ssize_t rbd_size_show(struct device *dev,
3256 struct device_attribute *attr, char *buf)
3258 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3260 return sprintf(buf, "%llu\n",
3261 (unsigned long long)rbd_dev->mapping.size);
3265 * Note this shows the features for whatever's mapped, which is not
3266 * necessarily the base image.
3268 static ssize_t rbd_features_show(struct device *dev,
3269 struct device_attribute *attr, char *buf)
3271 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3273 return sprintf(buf, "0x%016llx\n",
3274 (unsigned long long)rbd_dev->mapping.features);
3277 static ssize_t rbd_major_show(struct device *dev,
3278 struct device_attribute *attr, char *buf)
3280 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3283 return sprintf(buf, "%d\n", rbd_dev->major);
3285 return sprintf(buf, "(none)\n");
3289 static ssize_t rbd_client_id_show(struct device *dev,
3290 struct device_attribute *attr, char *buf)
3292 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3294 return sprintf(buf, "client%lld\n",
3295 ceph_client_id(rbd_dev->rbd_client->client));
3298 static ssize_t rbd_pool_show(struct device *dev,
3299 struct device_attribute *attr, char *buf)
3301 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3303 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3306 static ssize_t rbd_pool_id_show(struct device *dev,
3307 struct device_attribute *attr, char *buf)
3309 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3311 return sprintf(buf, "%llu\n",
3312 (unsigned long long) rbd_dev->spec->pool_id);
3315 static ssize_t rbd_name_show(struct device *dev,
3316 struct device_attribute *attr, char *buf)
3318 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3320 if (rbd_dev->spec->image_name)
3321 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3323 return sprintf(buf, "(unknown)\n");
3326 static ssize_t rbd_image_id_show(struct device *dev,
3327 struct device_attribute *attr, char *buf)
3329 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3331 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3335 * Shows the name of the currently-mapped snapshot (or
3336 * RBD_SNAP_HEAD_NAME for the base image).
3338 static ssize_t rbd_snap_show(struct device *dev,
3339 struct device_attribute *attr,
3342 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3344 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3348 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3349 * for the parent image. If there is no parent, simply shows
3350 * "(no parent image)".
3352 static ssize_t rbd_parent_show(struct device *dev,
3353 struct device_attribute *attr,
3356 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3357 struct rbd_spec *spec = rbd_dev->parent_spec;
3362 return sprintf(buf, "(no parent image)\n");
3364 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3365 (unsigned long long) spec->pool_id, spec->pool_name);
3370 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3371 spec->image_name ? spec->image_name : "(unknown)");
3376 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3377 (unsigned long long) spec->snap_id, spec->snap_name);
3382 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3387 return (ssize_t) (bufp - buf);
3390 static ssize_t rbd_image_refresh(struct device *dev,
3391 struct device_attribute *attr,
3395 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3398 ret = rbd_dev_refresh(rbd_dev);
3400 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3402 return ret < 0 ? ret : size;
3405 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3406 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3407 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3408 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3409 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3410 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3411 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3412 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3413 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3414 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3415 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3417 static struct attribute *rbd_attrs[] = {
3418 &dev_attr_size.attr,
3419 &dev_attr_features.attr,
3420 &dev_attr_major.attr,
3421 &dev_attr_client_id.attr,
3422 &dev_attr_pool.attr,
3423 &dev_attr_pool_id.attr,
3424 &dev_attr_name.attr,
3425 &dev_attr_image_id.attr,
3426 &dev_attr_current_snap.attr,
3427 &dev_attr_parent.attr,
3428 &dev_attr_refresh.attr,
3432 static struct attribute_group rbd_attr_group = {
3436 static const struct attribute_group *rbd_attr_groups[] = {
3441 static void rbd_sysfs_dev_release(struct device *dev)
3445 static struct device_type rbd_device_type = {
3447 .groups = rbd_attr_groups,
3448 .release = rbd_sysfs_dev_release,
3451 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3453 kref_get(&spec->kref);
3458 static void rbd_spec_free(struct kref *kref);
3459 static void rbd_spec_put(struct rbd_spec *spec)
3462 kref_put(&spec->kref, rbd_spec_free);
3465 static struct rbd_spec *rbd_spec_alloc(void)
3467 struct rbd_spec *spec;
3469 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3472 kref_init(&spec->kref);
3477 static void rbd_spec_free(struct kref *kref)
3479 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3481 kfree(spec->pool_name);
3482 kfree(spec->image_id);
3483 kfree(spec->image_name);
3484 kfree(spec->snap_name);
3488 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3489 struct rbd_spec *spec)
3491 struct rbd_device *rbd_dev;
3493 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3497 spin_lock_init(&rbd_dev->lock);
3499 INIT_LIST_HEAD(&rbd_dev->node);
3500 init_rwsem(&rbd_dev->header_rwsem);
3502 rbd_dev->spec = spec;
3503 rbd_dev->rbd_client = rbdc;
3505 /* Initialize the layout used for all rbd requests */
3507 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3508 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3509 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3510 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3515 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3517 rbd_put_client(rbd_dev->rbd_client);
3518 rbd_spec_put(rbd_dev->spec);
3523 * Get the size and object order for an image snapshot, or if
3524 * snap_id is CEPH_NOSNAP, gets this information for the base
3527 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3528 u8 *order, u64 *snap_size)
3530 __le64 snapid = cpu_to_le64(snap_id);
3535 } __attribute__ ((packed)) size_buf = { 0 };
3537 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3539 &snapid, sizeof (snapid),
3540 &size_buf, sizeof (size_buf));
3541 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3544 if (ret < sizeof (size_buf))
3548 *order = size_buf.order;
3549 *snap_size = le64_to_cpu(size_buf.size);
3551 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
3552 (unsigned long long)snap_id, (unsigned int)*order,
3553 (unsigned long long)*snap_size);
3558 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3560 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3561 &rbd_dev->header.obj_order,
3562 &rbd_dev->header.image_size);
3565 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3571 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3575 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3576 "rbd", "get_object_prefix", NULL, 0,
3577 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3578 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3583 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3584 p + ret, NULL, GFP_NOIO);
3587 if (IS_ERR(rbd_dev->header.object_prefix)) {
3588 ret = PTR_ERR(rbd_dev->header.object_prefix);
3589 rbd_dev->header.object_prefix = NULL;
3591 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3599 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3602 __le64 snapid = cpu_to_le64(snap_id);
3606 } __attribute__ ((packed)) features_buf = { 0 };
3610 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3611 "rbd", "get_features",
3612 &snapid, sizeof (snapid),
3613 &features_buf, sizeof (features_buf));
3614 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3617 if (ret < sizeof (features_buf))
3620 incompat = le64_to_cpu(features_buf.incompat);
3621 if (incompat & ~RBD_FEATURES_SUPPORTED)
3624 *snap_features = le64_to_cpu(features_buf.features);
3626 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3627 (unsigned long long)snap_id,
3628 (unsigned long long)*snap_features,
3629 (unsigned long long)le64_to_cpu(features_buf.incompat));
3634 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3636 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3637 &rbd_dev->header.features);
3640 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3642 struct rbd_spec *parent_spec;
3644 void *reply_buf = NULL;
3652 parent_spec = rbd_spec_alloc();
3656 size = sizeof (__le64) + /* pool_id */
3657 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3658 sizeof (__le64) + /* snap_id */
3659 sizeof (__le64); /* overlap */
3660 reply_buf = kmalloc(size, GFP_KERNEL);
3666 snapid = cpu_to_le64(CEPH_NOSNAP);
3667 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3668 "rbd", "get_parent",
3669 &snapid, sizeof (snapid),
3671 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3676 end = reply_buf + ret;
3678 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3679 if (parent_spec->pool_id == CEPH_NOPOOL)
3680 goto out; /* No parent? No problem. */
3682 /* The ceph file layout needs to fit pool id in 32 bits */
3685 if (parent_spec->pool_id > (u64)U32_MAX) {
3686 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3687 (unsigned long long)parent_spec->pool_id, U32_MAX);
3691 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3692 if (IS_ERR(image_id)) {
3693 ret = PTR_ERR(image_id);
3696 parent_spec->image_id = image_id;
3697 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3698 ceph_decode_64_safe(&p, end, overlap, out_err);
3700 rbd_dev->parent_overlap = overlap;
3701 rbd_dev->parent_spec = parent_spec;
3702 parent_spec = NULL; /* rbd_dev now owns this */
3707 rbd_spec_put(parent_spec);
3712 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3716 __le64 stripe_count;
3717 } __attribute__ ((packed)) striping_info_buf = { 0 };
3718 size_t size = sizeof (striping_info_buf);
3725 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3726 "rbd", "get_stripe_unit_count", NULL, 0,
3727 (char *)&striping_info_buf, size);
3728 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3735 * We don't actually support the "fancy striping" feature
3736 * (STRIPINGV2) yet, but if the striping sizes are the
3737 * defaults the behavior is the same as before. So find
3738 * out, and only fail if the image has non-default values.
3741 obj_size = (u64)1 << rbd_dev->header.obj_order;
3742 p = &striping_info_buf;
3743 stripe_unit = ceph_decode_64(&p);
3744 if (stripe_unit != obj_size) {
3745 rbd_warn(rbd_dev, "unsupported stripe unit "
3746 "(got %llu want %llu)",
3747 stripe_unit, obj_size);
3750 stripe_count = ceph_decode_64(&p);
3751 if (stripe_count != 1) {
3752 rbd_warn(rbd_dev, "unsupported stripe count "
3753 "(got %llu want 1)", stripe_count);
3756 rbd_dev->header.stripe_unit = stripe_unit;
3757 rbd_dev->header.stripe_count = stripe_count;
3762 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3764 size_t image_id_size;
3769 void *reply_buf = NULL;
3771 char *image_name = NULL;
3774 rbd_assert(!rbd_dev->spec->image_name);
3776 len = strlen(rbd_dev->spec->image_id);
3777 image_id_size = sizeof (__le32) + len;
3778 image_id = kmalloc(image_id_size, GFP_KERNEL);
3783 end = image_id + image_id_size;
3784 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3786 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3787 reply_buf = kmalloc(size, GFP_KERNEL);
3791 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3792 "rbd", "dir_get_name",
3793 image_id, image_id_size,
3798 end = reply_buf + ret;
3800 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3801 if (IS_ERR(image_name))
3804 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3812 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3814 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3815 const char *snap_name;
3818 /* Skip over names until we find the one we are looking for */
3820 snap_name = rbd_dev->header.snap_names;
3821 while (which < snapc->num_snaps) {
3822 if (!strcmp(name, snap_name))
3823 return snapc->snaps[which];
3824 snap_name += strlen(snap_name) + 1;
3830 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3832 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3837 for (which = 0; !found && which < snapc->num_snaps; which++) {
3838 const char *snap_name;
3840 snap_id = snapc->snaps[which];
3841 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3842 if (IS_ERR(snap_name))
3844 found = !strcmp(name, snap_name);
3847 return found ? snap_id : CEPH_NOSNAP;
3851 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3852 * no snapshot by that name is found, or if an error occurs.
3854 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3856 if (rbd_dev->image_format == 1)
3857 return rbd_v1_snap_id_by_name(rbd_dev, name);
3859 return rbd_v2_snap_id_by_name(rbd_dev, name);
3863 * When an rbd image has a parent image, it is identified by the
3864 * pool, image, and snapshot ids (not names). This function fills
3865 * in the names for those ids. (It's OK if we can't figure out the
3866 * name for an image id, but the pool and snapshot ids should always
3867 * exist and have names.) All names in an rbd spec are dynamically
3870 * When an image being mapped (not a parent) is probed, we have the
3871 * pool name and pool id, image name and image id, and the snapshot
3872 * name. The only thing we're missing is the snapshot id.
3874 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3876 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3877 struct rbd_spec *spec = rbd_dev->spec;
3878 const char *pool_name;
3879 const char *image_name;
3880 const char *snap_name;
3884 * An image being mapped will have the pool name (etc.), but
3885 * we need to look up the snapshot id.
3887 if (spec->pool_name) {
3888 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3891 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3892 if (snap_id == CEPH_NOSNAP)
3894 spec->snap_id = snap_id;
3896 spec->snap_id = CEPH_NOSNAP;
3902 /* Get the pool name; we have to make our own copy of this */
3904 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3906 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3909 pool_name = kstrdup(pool_name, GFP_KERNEL);
3913 /* Fetch the image name; tolerate failure here */
3915 image_name = rbd_dev_image_name(rbd_dev);
3917 rbd_warn(rbd_dev, "unable to get image name");
3919 /* Look up the snapshot name, and make a copy */
3921 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3927 spec->pool_name = pool_name;
3928 spec->image_name = image_name;
3929 spec->snap_name = snap_name;
3939 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3948 struct ceph_snap_context *snapc;
3952 * We'll need room for the seq value (maximum snapshot id),
3953 * snapshot count, and array of that many snapshot ids.
3954 * For now we have a fixed upper limit on the number we're
3955 * prepared to receive.
3957 size = sizeof (__le64) + sizeof (__le32) +
3958 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3959 reply_buf = kzalloc(size, GFP_KERNEL);
3963 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3964 "rbd", "get_snapcontext", NULL, 0,
3966 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3971 end = reply_buf + ret;
3973 ceph_decode_64_safe(&p, end, seq, out);
3974 ceph_decode_32_safe(&p, end, snap_count, out);
3977 * Make sure the reported number of snapshot ids wouldn't go
3978 * beyond the end of our buffer. But before checking that,
3979 * make sure the computed size of the snapshot context we
3980 * allocate is representable in a size_t.
3982 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3987 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3991 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3997 for (i = 0; i < snap_count; i++)
3998 snapc->snaps[i] = ceph_decode_64(&p);
4000 ceph_put_snap_context(rbd_dev->header.snapc);
4001 rbd_dev->header.snapc = snapc;
4003 dout(" snap context seq = %llu, snap_count = %u\n",
4004 (unsigned long long)seq, (unsigned int)snap_count);
4011 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4022 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4023 reply_buf = kmalloc(size, GFP_KERNEL);
4025 return ERR_PTR(-ENOMEM);
4027 snapid = cpu_to_le64(snap_id);
4028 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4029 "rbd", "get_snapshot_name",
4030 &snapid, sizeof (snapid),
4032 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4034 snap_name = ERR_PTR(ret);
4039 end = reply_buf + ret;
4040 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4041 if (IS_ERR(snap_name))
4044 dout(" snap_id 0x%016llx snap_name = %s\n",
4045 (unsigned long long)snap_id, snap_name);
4052 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4056 down_write(&rbd_dev->header_rwsem);
4058 ret = rbd_dev_v2_image_size(rbd_dev);
4061 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4062 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4063 rbd_dev->mapping.size = rbd_dev->header.image_size;
4065 ret = rbd_dev_v2_snap_context(rbd_dev);
4066 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4070 up_write(&rbd_dev->header_rwsem);
4075 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4080 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4082 dev = &rbd_dev->dev;
4083 dev->bus = &rbd_bus_type;
4084 dev->type = &rbd_device_type;
4085 dev->parent = &rbd_root_dev;
4086 dev->release = rbd_dev_device_release;
4087 dev_set_name(dev, "%d", rbd_dev->dev_id);
4088 ret = device_register(dev);
4090 mutex_unlock(&ctl_mutex);
4095 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4097 device_unregister(&rbd_dev->dev);
4100 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4103 * Get a unique rbd identifier for the given new rbd_dev, and add
4104 * the rbd_dev to the global list. The minimum rbd id is 1.
4106 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4108 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4110 spin_lock(&rbd_dev_list_lock);
4111 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4112 spin_unlock(&rbd_dev_list_lock);
4113 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4114 (unsigned long long) rbd_dev->dev_id);
4118 * Remove an rbd_dev from the global list, and record that its
4119 * identifier is no longer in use.
4121 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4123 struct list_head *tmp;
4124 int rbd_id = rbd_dev->dev_id;
4127 rbd_assert(rbd_id > 0);
4129 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4130 (unsigned long long) rbd_dev->dev_id);
4131 spin_lock(&rbd_dev_list_lock);
4132 list_del_init(&rbd_dev->node);
4135 * If the id being "put" is not the current maximum, there
4136 * is nothing special we need to do.
4138 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4139 spin_unlock(&rbd_dev_list_lock);
4144 * We need to update the current maximum id. Search the
4145 * list to find out what it is. We're more likely to find
4146 * the maximum at the end, so search the list backward.
4149 list_for_each_prev(tmp, &rbd_dev_list) {
4150 struct rbd_device *rbd_dev;
4152 rbd_dev = list_entry(tmp, struct rbd_device, node);
4153 if (rbd_dev->dev_id > max_id)
4154 max_id = rbd_dev->dev_id;
4156 spin_unlock(&rbd_dev_list_lock);
4159 * The max id could have been updated by rbd_dev_id_get(), in
4160 * which case it now accurately reflects the new maximum.
4161 * Be careful not to overwrite the maximum value in that
4164 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4165 dout(" max dev id has been reset\n");
4169 * Skips over white space at *buf, and updates *buf to point to the
4170 * first found non-space character (if any). Returns the length of
4171 * the token (string of non-white space characters) found. Note
4172 * that *buf must be terminated with '\0'.
4174 static inline size_t next_token(const char **buf)
4177 * These are the characters that produce nonzero for
4178 * isspace() in the "C" and "POSIX" locales.
4180 const char *spaces = " \f\n\r\t\v";
4182 *buf += strspn(*buf, spaces); /* Find start of token */
4184 return strcspn(*buf, spaces); /* Return token length */
4188 * Finds the next token in *buf, and if the provided token buffer is
4189 * big enough, copies the found token into it. The result, if
4190 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4191 * must be terminated with '\0' on entry.
4193 * Returns the length of the token found (not including the '\0').
4194 * Return value will be 0 if no token is found, and it will be >=
4195 * token_size if the token would not fit.
4197 * The *buf pointer will be updated to point beyond the end of the
4198 * found token. Note that this occurs even if the token buffer is
4199 * too small to hold it.
4201 static inline size_t copy_token(const char **buf,
4207 len = next_token(buf);
4208 if (len < token_size) {
4209 memcpy(token, *buf, len);
4210 *(token + len) = '\0';
4218 * Finds the next token in *buf, dynamically allocates a buffer big
4219 * enough to hold a copy of it, and copies the token into the new
4220 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4221 * that a duplicate buffer is created even for a zero-length token.
4223 * Returns a pointer to the newly-allocated duplicate, or a null
4224 * pointer if memory for the duplicate was not available. If
4225 * the lenp argument is a non-null pointer, the length of the token
4226 * (not including the '\0') is returned in *lenp.
4228 * If successful, the *buf pointer will be updated to point beyond
4229 * the end of the found token.
4231 * Note: uses GFP_KERNEL for allocation.
4233 static inline char *dup_token(const char **buf, size_t *lenp)
4238 len = next_token(buf);
4239 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4242 *(dup + len) = '\0';
4252 * Parse the options provided for an "rbd add" (i.e., rbd image
4253 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4254 * and the data written is passed here via a NUL-terminated buffer.
4255 * Returns 0 if successful or an error code otherwise.
4257 * The information extracted from these options is recorded in
4258 * the other parameters which return dynamically-allocated
4261 * The address of a pointer that will refer to a ceph options
4262 * structure. Caller must release the returned pointer using
4263 * ceph_destroy_options() when it is no longer needed.
4265 * Address of an rbd options pointer. Fully initialized by
4266 * this function; caller must release with kfree().
4268 * Address of an rbd image specification pointer. Fully
4269 * initialized by this function based on parsed options.
4270 * Caller must release with rbd_spec_put().
4272 * The options passed take this form:
4273 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4276 * A comma-separated list of one or more monitor addresses.
4277 * A monitor address is an ip address, optionally followed
4278 * by a port number (separated by a colon).
4279 * I.e.: ip1[:port1][,ip2[:port2]...]
4281 * A comma-separated list of ceph and/or rbd options.
4283 * The name of the rados pool containing the rbd image.
4285 * The name of the image in that pool to map.
4287 * An optional snapshot id. If provided, the mapping will
4288 * present data from the image at the time that snapshot was
4289 * created. The image head is used if no snapshot id is
4290 * provided. Snapshot mappings are always read-only.
4292 static int rbd_add_parse_args(const char *buf,
4293 struct ceph_options **ceph_opts,
4294 struct rbd_options **opts,
4295 struct rbd_spec **rbd_spec)
4299 const char *mon_addrs;
4301 size_t mon_addrs_size;
4302 struct rbd_spec *spec = NULL;
4303 struct rbd_options *rbd_opts = NULL;
4304 struct ceph_options *copts;
4307 /* The first four tokens are required */
4309 len = next_token(&buf);
4311 rbd_warn(NULL, "no monitor address(es) provided");
4315 mon_addrs_size = len + 1;
4319 options = dup_token(&buf, NULL);
4323 rbd_warn(NULL, "no options provided");
4327 spec = rbd_spec_alloc();
4331 spec->pool_name = dup_token(&buf, NULL);
4332 if (!spec->pool_name)
4334 if (!*spec->pool_name) {
4335 rbd_warn(NULL, "no pool name provided");
4339 spec->image_name = dup_token(&buf, NULL);
4340 if (!spec->image_name)
4342 if (!*spec->image_name) {
4343 rbd_warn(NULL, "no image name provided");
4348 * Snapshot name is optional; default is to use "-"
4349 * (indicating the head/no snapshot).
4351 len = next_token(&buf);
4353 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4354 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4355 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4356 ret = -ENAMETOOLONG;
4359 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4362 *(snap_name + len) = '\0';
4363 spec->snap_name = snap_name;
4365 /* Initialize all rbd options to the defaults */
4367 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4371 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4373 copts = ceph_parse_options(options, mon_addrs,
4374 mon_addrs + mon_addrs_size - 1,
4375 parse_rbd_opts_token, rbd_opts);
4376 if (IS_ERR(copts)) {
4377 ret = PTR_ERR(copts);
4398 * An rbd format 2 image has a unique identifier, distinct from the
4399 * name given to it by the user. Internally, that identifier is
4400 * what's used to specify the names of objects related to the image.
4402 * A special "rbd id" object is used to map an rbd image name to its
4403 * id. If that object doesn't exist, then there is no v2 rbd image
4404 * with the supplied name.
4406 * This function will record the given rbd_dev's image_id field if
4407 * it can be determined, and in that case will return 0. If any
4408 * errors occur a negative errno will be returned and the rbd_dev's
4409 * image_id field will be unchanged (and should be NULL).
4411 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4420 * When probing a parent image, the image id is already
4421 * known (and the image name likely is not). There's no
4422 * need to fetch the image id again in this case. We
4423 * do still need to set the image format though.
4425 if (rbd_dev->spec->image_id) {
4426 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4432 * First, see if the format 2 image id file exists, and if
4433 * so, get the image's persistent id from it.
4435 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4436 object_name = kmalloc(size, GFP_NOIO);
4439 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4440 dout("rbd id object name is %s\n", object_name);
4442 /* Response will be an encoded string, which includes a length */
4444 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4445 response = kzalloc(size, GFP_NOIO);
4451 /* If it doesn't exist we'll assume it's a format 1 image */
4453 ret = rbd_obj_method_sync(rbd_dev, object_name,
4454 "rbd", "get_id", NULL, 0,
4455 response, RBD_IMAGE_ID_LEN_MAX);
4456 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4457 if (ret == -ENOENT) {
4458 image_id = kstrdup("", GFP_KERNEL);
4459 ret = image_id ? 0 : -ENOMEM;
4461 rbd_dev->image_format = 1;
4462 } else if (ret > sizeof (__le32)) {
4465 image_id = ceph_extract_encoded_string(&p, p + ret,
4467 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4469 rbd_dev->image_format = 2;
4475 rbd_dev->spec->image_id = image_id;
4476 dout("image_id is %s\n", image_id);
4485 /* Undo whatever state changes are made by v1 or v2 image probe */
4487 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4489 struct rbd_image_header *header;
4491 rbd_dev_remove_parent(rbd_dev);
4492 rbd_spec_put(rbd_dev->parent_spec);
4493 rbd_dev->parent_spec = NULL;
4494 rbd_dev->parent_overlap = 0;
4496 /* Free dynamic fields from the header, then zero it out */
4498 header = &rbd_dev->header;
4499 ceph_put_snap_context(header->snapc);
4500 kfree(header->snap_sizes);
4501 kfree(header->snap_names);
4502 kfree(header->object_prefix);
4503 memset(header, 0, sizeof (*header));
4506 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4510 /* Populate rbd image metadata */
4512 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4516 /* Version 1 images have no parent (no layering) */
4518 rbd_dev->parent_spec = NULL;
4519 rbd_dev->parent_overlap = 0;
4521 dout("discovered version 1 image, header name is %s\n",
4522 rbd_dev->header_name);
4527 kfree(rbd_dev->header_name);
4528 rbd_dev->header_name = NULL;
4529 kfree(rbd_dev->spec->image_id);
4530 rbd_dev->spec->image_id = NULL;
4535 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4539 ret = rbd_dev_v2_image_size(rbd_dev);
4543 /* Get the object prefix (a.k.a. block_name) for the image */
4545 ret = rbd_dev_v2_object_prefix(rbd_dev);
4549 /* Get the and check features for the image */
4551 ret = rbd_dev_v2_features(rbd_dev);
4555 /* If the image supports layering, get the parent info */
4557 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4558 ret = rbd_dev_v2_parent_info(rbd_dev);
4562 * Print a warning if this image has a parent.
4563 * Don't print it if the image now being probed
4564 * is itself a parent. We can tell at this point
4565 * because we won't know its pool name yet (just its
4568 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
4569 rbd_warn(rbd_dev, "WARNING: kernel layering "
4570 "is EXPERIMENTAL!");
4573 /* If the image supports fancy striping, get its parameters */
4575 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4576 ret = rbd_dev_v2_striping_info(rbd_dev);
4581 /* crypto and compression type aren't (yet) supported for v2 images */
4583 rbd_dev->header.crypt_type = 0;
4584 rbd_dev->header.comp_type = 0;
4586 /* Get the snapshot context, plus the header version */
4588 ret = rbd_dev_v2_snap_context(rbd_dev);
4592 dout("discovered version 2 image, header name is %s\n",
4593 rbd_dev->header_name);
4597 rbd_dev->parent_overlap = 0;
4598 rbd_spec_put(rbd_dev->parent_spec);
4599 rbd_dev->parent_spec = NULL;
4600 kfree(rbd_dev->header_name);
4601 rbd_dev->header_name = NULL;
4602 kfree(rbd_dev->header.object_prefix);
4603 rbd_dev->header.object_prefix = NULL;
4608 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4610 struct rbd_device *parent = NULL;
4611 struct rbd_spec *parent_spec;
4612 struct rbd_client *rbdc;
4615 if (!rbd_dev->parent_spec)
4618 * We need to pass a reference to the client and the parent
4619 * spec when creating the parent rbd_dev. Images related by
4620 * parent/child relationships always share both.
4622 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4623 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4626 parent = rbd_dev_create(rbdc, parent_spec);
4630 ret = rbd_dev_image_probe(parent);
4633 rbd_dev->parent = parent;
4638 rbd_spec_put(rbd_dev->parent_spec);
4639 kfree(rbd_dev->header_name);
4640 rbd_dev_destroy(parent);
4642 rbd_put_client(rbdc);
4643 rbd_spec_put(parent_spec);
4649 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4653 ret = rbd_dev_mapping_set(rbd_dev);
4657 /* generate unique id: find highest unique id, add one */
4658 rbd_dev_id_get(rbd_dev);
4660 /* Fill in the device name, now that we have its id. */
4661 BUILD_BUG_ON(DEV_NAME_LEN
4662 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4663 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4665 /* Get our block major device number. */
4667 ret = register_blkdev(0, rbd_dev->name);
4670 rbd_dev->major = ret;
4672 /* Set up the blkdev mapping. */
4674 ret = rbd_init_disk(rbd_dev);
4676 goto err_out_blkdev;
4678 ret = rbd_bus_add_dev(rbd_dev);
4682 /* Everything's ready. Announce the disk to the world. */
4684 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4685 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4686 add_disk(rbd_dev->disk);
4688 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4689 (unsigned long long) rbd_dev->mapping.size);
4694 rbd_free_disk(rbd_dev);
4696 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4698 rbd_dev_id_put(rbd_dev);
4699 rbd_dev_mapping_clear(rbd_dev);
4704 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4706 struct rbd_spec *spec = rbd_dev->spec;
4709 /* Record the header object name for this rbd image. */
4711 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4713 if (rbd_dev->image_format == 1)
4714 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4716 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4718 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4719 if (!rbd_dev->header_name)
4722 if (rbd_dev->image_format == 1)
4723 sprintf(rbd_dev->header_name, "%s%s",
4724 spec->image_name, RBD_SUFFIX);
4726 sprintf(rbd_dev->header_name, "%s%s",
4727 RBD_HEADER_PREFIX, spec->image_id);
4731 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4735 rbd_dev_unprobe(rbd_dev);
4736 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4738 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4739 kfree(rbd_dev->header_name);
4740 rbd_dev->header_name = NULL;
4741 rbd_dev->image_format = 0;
4742 kfree(rbd_dev->spec->image_id);
4743 rbd_dev->spec->image_id = NULL;
4745 rbd_dev_destroy(rbd_dev);
4749 * Probe for the existence of the header object for the given rbd
4750 * device. For format 2 images this includes determining the image
4753 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4759 * Get the id from the image id object. If it's not a
4760 * format 2 image, we'll get ENOENT back, and we'll assume
4761 * it's a format 1 image.
4763 ret = rbd_dev_image_id(rbd_dev);
4766 rbd_assert(rbd_dev->spec->image_id);
4767 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4769 ret = rbd_dev_header_name(rbd_dev);
4771 goto err_out_format;
4773 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4775 goto out_header_name;
4777 if (rbd_dev->image_format == 1)
4778 ret = rbd_dev_v1_probe(rbd_dev);
4780 ret = rbd_dev_v2_probe(rbd_dev);
4784 ret = rbd_dev_spec_update(rbd_dev);
4788 ret = rbd_dev_probe_parent(rbd_dev);
4793 rbd_dev_unprobe(rbd_dev);
4795 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4797 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4799 kfree(rbd_dev->header_name);
4800 rbd_dev->header_name = NULL;
4802 rbd_dev->image_format = 0;
4803 kfree(rbd_dev->spec->image_id);
4804 rbd_dev->spec->image_id = NULL;
4806 dout("probe failed, returning %d\n", ret);
4811 static ssize_t rbd_add(struct bus_type *bus,
4815 struct rbd_device *rbd_dev = NULL;
4816 struct ceph_options *ceph_opts = NULL;
4817 struct rbd_options *rbd_opts = NULL;
4818 struct rbd_spec *spec = NULL;
4819 struct rbd_client *rbdc;
4820 struct ceph_osd_client *osdc;
4823 if (!try_module_get(THIS_MODULE))
4826 /* parse add command */
4827 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4829 goto err_out_module;
4831 rbdc = rbd_get_client(ceph_opts);
4836 ceph_opts = NULL; /* rbd_dev client now owns this */
4839 osdc = &rbdc->client->osdc;
4840 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4842 goto err_out_client;
4843 spec->pool_id = (u64)rc;
4845 /* The ceph file layout needs to fit pool id in 32 bits */
4847 if (spec->pool_id > (u64)U32_MAX) {
4848 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4849 (unsigned long long)spec->pool_id, U32_MAX);
4851 goto err_out_client;
4854 rbd_dev = rbd_dev_create(rbdc, spec);
4856 goto err_out_client;
4857 rbdc = NULL; /* rbd_dev now owns this */
4858 spec = NULL; /* rbd_dev now owns this */
4860 rbd_dev->mapping.read_only = rbd_opts->read_only;
4862 rbd_opts = NULL; /* done with this */
4864 rc = rbd_dev_image_probe(rbd_dev);
4866 goto err_out_rbd_dev;
4868 rc = rbd_dev_device_setup(rbd_dev);
4872 rbd_dev_image_release(rbd_dev);
4874 rbd_dev_destroy(rbd_dev);
4876 rbd_put_client(rbdc);
4879 ceph_destroy_options(ceph_opts);
4883 module_put(THIS_MODULE);
4885 dout("Error adding device %s\n", buf);
4890 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4892 struct list_head *tmp;
4893 struct rbd_device *rbd_dev;
4895 spin_lock(&rbd_dev_list_lock);
4896 list_for_each(tmp, &rbd_dev_list) {
4897 rbd_dev = list_entry(tmp, struct rbd_device, node);
4898 if (rbd_dev->dev_id == dev_id) {
4899 spin_unlock(&rbd_dev_list_lock);
4903 spin_unlock(&rbd_dev_list_lock);
4907 static void rbd_dev_device_release(struct device *dev)
4909 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4911 rbd_free_disk(rbd_dev);
4912 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4913 rbd_dev_clear_mapping(rbd_dev);
4914 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4916 rbd_dev_id_put(rbd_dev);
4917 rbd_dev_mapping_clear(rbd_dev);
4920 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4922 while (rbd_dev->parent) {
4923 struct rbd_device *first = rbd_dev;
4924 struct rbd_device *second = first->parent;
4925 struct rbd_device *third;
4928 * Follow to the parent with no grandparent and
4931 while (second && (third = second->parent)) {
4936 rbd_dev_image_release(second);
4937 first->parent = NULL;
4938 first->parent_overlap = 0;
4940 rbd_assert(first->parent_spec);
4941 rbd_spec_put(first->parent_spec);
4942 first->parent_spec = NULL;
4946 static ssize_t rbd_remove(struct bus_type *bus,
4950 struct rbd_device *rbd_dev = NULL;
4955 ret = strict_strtoul(buf, 10, &ul);
4959 /* convert to int; abort if we lost anything in the conversion */
4960 target_id = (int) ul;
4961 if (target_id != ul)
4964 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4966 rbd_dev = __rbd_get_dev(target_id);
4972 spin_lock_irq(&rbd_dev->lock);
4973 if (rbd_dev->open_count)
4976 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4977 spin_unlock_irq(&rbd_dev->lock);
4981 rbd_bus_del_dev(rbd_dev);
4982 rbd_dev_image_release(rbd_dev);
4983 module_put(THIS_MODULE);
4985 mutex_unlock(&ctl_mutex);
4991 * create control files in sysfs
4994 static int rbd_sysfs_init(void)
4998 ret = device_register(&rbd_root_dev);
5002 ret = bus_register(&rbd_bus_type);
5004 device_unregister(&rbd_root_dev);
5009 static void rbd_sysfs_cleanup(void)
5011 bus_unregister(&rbd_bus_type);
5012 device_unregister(&rbd_root_dev);
5015 static int rbd_slab_init(void)
5017 rbd_assert(!rbd_img_request_cache);
5018 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5019 sizeof (struct rbd_img_request),
5020 __alignof__(struct rbd_img_request),
5022 if (!rbd_img_request_cache)
5025 rbd_assert(!rbd_obj_request_cache);
5026 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5027 sizeof (struct rbd_obj_request),
5028 __alignof__(struct rbd_obj_request),
5030 if (!rbd_obj_request_cache)
5033 rbd_assert(!rbd_segment_name_cache);
5034 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5035 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5036 if (rbd_segment_name_cache)
5039 if (rbd_obj_request_cache) {
5040 kmem_cache_destroy(rbd_obj_request_cache);
5041 rbd_obj_request_cache = NULL;
5044 kmem_cache_destroy(rbd_img_request_cache);
5045 rbd_img_request_cache = NULL;
5050 static void rbd_slab_exit(void)
5052 rbd_assert(rbd_segment_name_cache);
5053 kmem_cache_destroy(rbd_segment_name_cache);
5054 rbd_segment_name_cache = NULL;
5056 rbd_assert(rbd_obj_request_cache);
5057 kmem_cache_destroy(rbd_obj_request_cache);
5058 rbd_obj_request_cache = NULL;
5060 rbd_assert(rbd_img_request_cache);
5061 kmem_cache_destroy(rbd_img_request_cache);
5062 rbd_img_request_cache = NULL;
5065 static int __init rbd_init(void)
5069 if (!libceph_compatible(NULL)) {
5070 rbd_warn(NULL, "libceph incompatibility (quitting)");
5074 rc = rbd_slab_init();
5077 rc = rbd_sysfs_init();
5081 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5086 static void __exit rbd_exit(void)
5088 rbd_sysfs_cleanup();
5092 module_init(rbd_init);
5093 module_exit(rbd_exit);
5095 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5096 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5097 MODULE_DESCRIPTION("rados block device");
5099 /* following authorship retained from original osdblk.c */
5100 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5102 MODULE_LICENSE("GPL");