2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
61 #define RBD_SNAP_HEAD_NAME "-"
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75 * block device image metadata (in-memory version)
77 struct rbd_image_header {
83 struct ceph_snap_context *snapc;
84 size_t snap_names_len;
98 * an instance of the client. multiple devices may share an rbd client.
101 struct ceph_client *client;
102 struct rbd_options *rbd_opts;
104 struct list_head node;
108 * a request completion status
110 struct rbd_req_status {
117 * a collection of requests
119 struct rbd_req_coll {
123 struct rbd_req_status status[0];
127 * a single io request
130 struct request *rq; /* blk layer request */
131 struct bio *bio; /* cloned bio */
132 struct page **pages; /* list of used pages */
135 struct rbd_req_coll *coll;
142 struct list_head node;
150 int dev_id; /* blkdev unique id */
152 int major; /* blkdev assigned major */
153 struct gendisk *disk; /* blkdev's gendisk and rq */
154 struct request_queue *q;
156 struct rbd_client *rbd_client;
158 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
160 spinlock_t lock; /* queue lock */
162 struct rbd_image_header header;
164 size_t image_name_len;
169 struct ceph_osd_event *watch_event;
170 struct ceph_osd_request *watch_request;
172 /* protects updating the header */
173 struct rw_semaphore header_rwsem;
174 /* name of the snapshot this device reads from */
176 /* id of the snapshot this device reads from */
177 u64 snap_id; /* current snapshot id */
178 /* whether the snap_id this device reads from still exists */
182 struct list_head node;
184 /* list of snapshots */
185 struct list_head snaps;
191 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
193 static LIST_HEAD(rbd_dev_list); /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
196 static LIST_HEAD(rbd_client_list); /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202 struct device_attribute *attr,
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
212 static struct bus_attribute rbd_bus_attrs[] = {
213 __ATTR(add, S_IWUSR, NULL, rbd_add),
214 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
218 static struct bus_type rbd_bus_type = {
220 .bus_attrs = rbd_bus_attrs,
223 static void rbd_root_dev_release(struct device *dev)
227 static struct device rbd_root_dev = {
229 .release = rbd_root_dev_release,
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
235 return get_device(&rbd_dev->dev);
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
240 put_device(&rbd_dev->dev);
243 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
247 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
249 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
252 rbd_get_dev(rbd_dev);
253 set_device_ro(bdev, rbd_dev->read_only);
258 static int rbd_release(struct gendisk *disk, fmode_t mode)
260 struct rbd_device *rbd_dev = disk->private_data;
262 rbd_put_dev(rbd_dev);
267 static const struct block_device_operations rbd_bd_ops = {
268 .owner = THIS_MODULE,
270 .release = rbd_release,
274 * Initialize an rbd client instance.
277 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
278 struct rbd_options *rbd_opts)
280 struct rbd_client *rbdc;
283 dout("rbd_client_create\n");
284 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
288 kref_init(&rbdc->kref);
289 INIT_LIST_HEAD(&rbdc->node);
291 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
293 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
294 if (IS_ERR(rbdc->client))
296 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
298 ret = ceph_open_session(rbdc->client);
302 rbdc->rbd_opts = rbd_opts;
304 spin_lock(&rbd_client_list_lock);
305 list_add_tail(&rbdc->node, &rbd_client_list);
306 spin_unlock(&rbd_client_list_lock);
308 mutex_unlock(&ctl_mutex);
310 dout("rbd_client_create created %p\n", rbdc);
314 ceph_destroy_client(rbdc->client);
316 mutex_unlock(&ctl_mutex);
320 ceph_destroy_options(ceph_opts);
325 * Find a ceph client with specific addr and configuration.
327 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
329 struct rbd_client *client_node;
331 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
334 list_for_each_entry(client_node, &rbd_client_list, node)
335 if (!ceph_compare_options(ceph_opts, client_node->client))
348 /* string args above */
351 static match_table_t rbd_opts_tokens = {
352 {Opt_notify_timeout, "notify_timeout=%d"},
354 /* string args above */
358 static int parse_rbd_opts_token(char *c, void *private)
360 struct rbd_options *rbd_opts = private;
361 substring_t argstr[MAX_OPT_ARGS];
362 int token, intval, ret;
364 token = match_token(c, rbd_opts_tokens, argstr);
368 if (token < Opt_last_int) {
369 ret = match_int(&argstr[0], &intval);
371 pr_err("bad mount option arg (not int) "
375 dout("got int token %d val %d\n", token, intval);
376 } else if (token > Opt_last_int && token < Opt_last_string) {
377 dout("got string token %d val %s\n", token,
380 dout("got token %d\n", token);
384 case Opt_notify_timeout:
385 rbd_opts->notify_timeout = intval;
394 * Get a ceph client with specific addr and configuration, if one does
395 * not exist create it.
397 static struct rbd_client *rbd_get_client(const char *mon_addr,
401 struct rbd_client *rbdc;
402 struct ceph_options *ceph_opts;
403 struct rbd_options *rbd_opts;
405 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
407 return ERR_PTR(-ENOMEM);
409 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
411 ceph_opts = ceph_parse_options(options, mon_addr,
412 mon_addr + mon_addr_len,
413 parse_rbd_opts_token, rbd_opts);
414 if (IS_ERR(ceph_opts)) {
416 return ERR_CAST(ceph_opts);
419 spin_lock(&rbd_client_list_lock);
420 rbdc = __rbd_client_find(ceph_opts);
422 /* using an existing client */
423 kref_get(&rbdc->kref);
424 spin_unlock(&rbd_client_list_lock);
426 ceph_destroy_options(ceph_opts);
431 spin_unlock(&rbd_client_list_lock);
433 rbdc = rbd_client_create(ceph_opts, rbd_opts);
442 * Destroy ceph client
444 * Caller must hold rbd_client_list_lock.
446 static void rbd_client_release(struct kref *kref)
448 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
450 dout("rbd_release_client %p\n", rbdc);
451 spin_lock(&rbd_client_list_lock);
452 list_del(&rbdc->node);
453 spin_unlock(&rbd_client_list_lock);
455 ceph_destroy_client(rbdc->client);
456 kfree(rbdc->rbd_opts);
461 * Drop reference to ceph client node. If it's not referenced anymore, release
464 static void rbd_put_client(struct rbd_device *rbd_dev)
466 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
467 rbd_dev->rbd_client = NULL;
471 * Destroy requests collection
473 static void rbd_coll_release(struct kref *kref)
475 struct rbd_req_coll *coll =
476 container_of(kref, struct rbd_req_coll, kref);
478 dout("rbd_coll_release %p\n", coll);
482 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
484 return !memcmp(&ondisk->text,
485 RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
489 * Create a new header structure, translate header format from the on-disk
492 static int rbd_header_from_disk(struct rbd_image_header *header,
493 struct rbd_image_header_ondisk *ondisk,
498 if (!rbd_dev_ondisk_valid(ondisk))
501 snap_count = le32_to_cpu(ondisk->snap_count);
502 if (snap_count > (SIZE_MAX - sizeof(struct ceph_snap_context))
505 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
506 snap_count * sizeof(u64),
512 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
513 header->snap_names = kmalloc(header->snap_names_len,
515 if (!header->snap_names)
517 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
519 if (!header->snap_sizes)
522 WARN_ON(ondisk->snap_names_len);
523 header->snap_names_len = 0;
524 header->snap_names = NULL;
525 header->snap_sizes = NULL;
528 header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
530 if (!header->object_prefix)
533 memcpy(header->object_prefix, ondisk->block_name,
534 sizeof(ondisk->block_name));
535 header->object_prefix[sizeof (ondisk->block_name)] = '\0';
537 header->image_size = le64_to_cpu(ondisk->image_size);
538 header->obj_order = ondisk->options.order;
539 header->crypt_type = ondisk->options.crypt_type;
540 header->comp_type = ondisk->options.comp_type;
542 atomic_set(&header->snapc->nref, 1);
543 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
544 header->snapc->num_snaps = snap_count;
545 header->total_snaps = snap_count;
547 if (snap_count && allocated_snaps == snap_count) {
550 for (i = 0; i < snap_count; i++) {
551 header->snapc->snaps[i] =
552 le64_to_cpu(ondisk->snaps[i].id);
553 header->snap_sizes[i] =
554 le64_to_cpu(ondisk->snaps[i].image_size);
557 /* copy snapshot names */
558 memcpy(header->snap_names, &ondisk->snaps[snap_count],
559 header->snap_names_len);
565 kfree(header->snap_sizes);
566 header->snap_sizes = NULL;
568 kfree(header->snap_names);
569 header->snap_names = NULL;
571 kfree(header->snapc);
572 header->snapc = NULL;
577 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
581 char *p = header->snap_names;
583 for (i = 0; i < header->total_snaps; i++) {
584 if (!strcmp(snap_name, p)) {
586 /* Found it. Pass back its id and/or size */
589 *seq = header->snapc->snaps[i];
591 *size = header->snap_sizes[i];
594 p += strlen(p) + 1; /* Skip ahead to the next name */
599 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
603 down_write(&rbd_dev->header_rwsem);
605 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
606 sizeof (RBD_SNAP_HEAD_NAME))) {
607 rbd_dev->snap_id = CEPH_NOSNAP;
608 rbd_dev->snap_exists = false;
609 rbd_dev->read_only = 0;
611 *size = rbd_dev->header.image_size;
615 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
619 rbd_dev->snap_id = snap_id;
620 rbd_dev->snap_exists = true;
621 rbd_dev->read_only = 1;
626 up_write(&rbd_dev->header_rwsem);
630 static void rbd_header_free(struct rbd_image_header *header)
632 kfree(header->object_prefix);
633 kfree(header->snap_sizes);
634 kfree(header->snap_names);
635 ceph_put_snap_context(header->snapc);
639 * get the actual striped segment name, offset and length
641 static u64 rbd_get_segment(struct rbd_image_header *header,
642 const char *object_prefix,
644 char *seg_name, u64 *segofs)
646 u64 seg = ofs >> header->obj_order;
649 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
650 "%s.%012llx", object_prefix, seg);
652 ofs = ofs & ((1 << header->obj_order) - 1);
653 len = min_t(u64, len, (1 << header->obj_order) - ofs);
661 static int rbd_get_num_segments(struct rbd_image_header *header,
664 u64 start_seg = ofs >> header->obj_order;
665 u64 end_seg = (ofs + len - 1) >> header->obj_order;
666 return end_seg - start_seg + 1;
670 * returns the size of an object in the image
672 static u64 rbd_obj_bytes(struct rbd_image_header *header)
674 return 1 << header->obj_order;
681 static void bio_chain_put(struct bio *chain)
687 chain = chain->bi_next;
693 * zeros a bio chain, starting at specific offset
695 static void zero_bio_chain(struct bio *chain, int start_ofs)
704 bio_for_each_segment(bv, chain, i) {
705 if (pos + bv->bv_len > start_ofs) {
706 int remainder = max(start_ofs - pos, 0);
707 buf = bvec_kmap_irq(bv, &flags);
708 memset(buf + remainder, 0,
709 bv->bv_len - remainder);
710 bvec_kunmap_irq(buf, &flags);
715 chain = chain->bi_next;
720 * bio_chain_clone - clone a chain of bios up to a certain length.
721 * might return a bio_pair that will need to be released.
723 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
724 struct bio_pair **bp,
725 int len, gfp_t gfpmask)
727 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
731 bio_pair_release(*bp);
735 while (old_chain && (total < len)) {
736 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
740 if (total + old_chain->bi_size > len) {
744 * this split can only happen with a single paged bio,
745 * split_bio will BUG_ON if this is not the case
747 dout("bio_chain_clone split! total=%d remaining=%d"
749 total, len - total, old_chain->bi_size);
751 /* split the bio. We'll release it either in the next
752 call, or it will have to be released outside */
753 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
757 __bio_clone(tmp, &bp->bio1);
761 __bio_clone(tmp, old_chain);
762 *next = old_chain->bi_next;
766 gfpmask &= ~__GFP_WAIT;
770 new_chain = tail = tmp;
775 old_chain = old_chain->bi_next;
777 total += tmp->bi_size;
783 tail->bi_next = NULL;
790 dout("bio_chain_clone with err\n");
791 bio_chain_put(new_chain);
796 * helpers for osd request op vectors.
798 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
799 int opcode, u32 payload_len)
801 struct ceph_osd_req_op *ops;
803 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
810 * op extent offset and length will be set later on
811 * in calc_raw_layout()
813 ops[0].payload_len = payload_len;
818 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
823 static void rbd_coll_end_req_index(struct request *rq,
824 struct rbd_req_coll *coll,
828 struct request_queue *q;
831 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
832 coll, index, ret, (unsigned long long) len);
838 blk_end_request(rq, ret, len);
844 spin_lock_irq(q->queue_lock);
845 coll->status[index].done = 1;
846 coll->status[index].rc = ret;
847 coll->status[index].bytes = len;
848 max = min = coll->num_done;
849 while (max < coll->total && coll->status[max].done)
852 for (i = min; i<max; i++) {
853 __blk_end_request(rq, coll->status[i].rc,
854 coll->status[i].bytes);
856 kref_put(&coll->kref, rbd_coll_release);
858 spin_unlock_irq(q->queue_lock);
861 static void rbd_coll_end_req(struct rbd_request *req,
864 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
868 * Send ceph osd request
870 static int rbd_do_request(struct request *rq,
871 struct rbd_device *rbd_dev,
872 struct ceph_snap_context *snapc,
874 const char *object_name, u64 ofs, u64 len,
879 struct ceph_osd_req_op *ops,
880 struct rbd_req_coll *coll,
882 void (*rbd_cb)(struct ceph_osd_request *req,
883 struct ceph_msg *msg),
884 struct ceph_osd_request **linger_req,
887 struct ceph_osd_request *req;
888 struct ceph_file_layout *layout;
891 struct timespec mtime = CURRENT_TIME;
892 struct rbd_request *req_data;
893 struct ceph_osd_request_head *reqhead;
894 struct ceph_osd_client *osdc;
896 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
899 rbd_coll_end_req_index(rq, coll, coll_index,
905 req_data->coll = coll;
906 req_data->coll_index = coll_index;
909 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
910 (unsigned long long) ofs, (unsigned long long) len);
912 osdc = &rbd_dev->rbd_client->client->osdc;
913 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
914 false, GFP_NOIO, pages, bio);
920 req->r_callback = rbd_cb;
924 req_data->pages = pages;
927 req->r_priv = req_data;
929 reqhead = req->r_request->front.iov_base;
930 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
932 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
933 req->r_oid_len = strlen(req->r_oid);
935 layout = &req->r_file_layout;
936 memset(layout, 0, sizeof(*layout));
937 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
938 layout->fl_stripe_count = cpu_to_le32(1);
939 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
940 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
941 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
944 ceph_osdc_build_request(req, ofs, &len,
948 req->r_oid, req->r_oid_len);
951 ceph_osdc_set_request_linger(osdc, req);
955 ret = ceph_osdc_start_request(osdc, req, false);
960 ret = ceph_osdc_wait_request(osdc, req);
962 *ver = le64_to_cpu(req->r_reassert_version.version);
963 dout("reassert_ver=%llu\n",
965 le64_to_cpu(req->r_reassert_version.version));
966 ceph_osdc_put_request(req);
971 bio_chain_put(req_data->bio);
972 ceph_osdc_put_request(req);
974 rbd_coll_end_req(req_data, ret, len);
980 * Ceph osd op callback
982 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
984 struct rbd_request *req_data = req->r_priv;
985 struct ceph_osd_reply_head *replyhead;
986 struct ceph_osd_op *op;
992 replyhead = msg->front.iov_base;
993 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
994 op = (void *)(replyhead + 1);
995 rc = le32_to_cpu(replyhead->result);
996 bytes = le64_to_cpu(op->extent.length);
997 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
999 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1000 (unsigned long long) bytes, read_op, (int) rc);
1002 if (rc == -ENOENT && read_op) {
1003 zero_bio_chain(req_data->bio, 0);
1005 } else if (rc == 0 && read_op && bytes < req_data->len) {
1006 zero_bio_chain(req_data->bio, bytes);
1007 bytes = req_data->len;
1010 rbd_coll_end_req(req_data, rc, bytes);
1013 bio_chain_put(req_data->bio);
1015 ceph_osdc_put_request(req);
1019 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1021 ceph_osdc_put_request(req);
1025 * Do a synchronous ceph osd operation
1027 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1028 struct ceph_snap_context *snapc,
1031 struct ceph_osd_req_op *ops,
1032 const char *object_name,
1035 struct ceph_osd_request **linger_req,
1039 struct page **pages;
1042 BUG_ON(ops == NULL);
1044 num_pages = calc_pages_for(ofs , len);
1045 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1047 return PTR_ERR(pages);
1049 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1050 object_name, ofs, len, NULL,
1060 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1061 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1064 ceph_release_page_vector(pages, num_pages);
1069 * Do an asynchronous ceph osd operation
1071 static int rbd_do_op(struct request *rq,
1072 struct rbd_device *rbd_dev,
1073 struct ceph_snap_context *snapc,
1075 int opcode, int flags,
1078 struct rbd_req_coll *coll,
1085 struct ceph_osd_req_op *ops;
1088 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1092 seg_len = rbd_get_segment(&rbd_dev->header,
1093 rbd_dev->header.object_prefix,
1095 seg_name, &seg_ofs);
1097 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1100 ops = rbd_create_rw_ops(1, opcode, payload_len);
1104 /* we've taken care of segment sizes earlier when we
1105 cloned the bios. We should never have a segment
1106 truncated at this point */
1107 BUG_ON(seg_len < len);
1109 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1110 seg_name, seg_ofs, seg_len,
1116 rbd_req_cb, 0, NULL);
1118 rbd_destroy_ops(ops);
1125 * Request async osd write
1127 static int rbd_req_write(struct request *rq,
1128 struct rbd_device *rbd_dev,
1129 struct ceph_snap_context *snapc,
1132 struct rbd_req_coll *coll,
1135 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1137 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1138 ofs, len, bio, coll, coll_index);
1142 * Request async osd read
1144 static int rbd_req_read(struct request *rq,
1145 struct rbd_device *rbd_dev,
1149 struct rbd_req_coll *coll,
1152 return rbd_do_op(rq, rbd_dev, NULL,
1156 ofs, len, bio, coll, coll_index);
1160 * Request sync osd read
1162 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1164 const char *object_name,
1169 struct ceph_osd_req_op *ops;
1172 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1176 ret = rbd_req_sync_op(rbd_dev, NULL,
1179 ops, object_name, ofs, len, buf, NULL, ver);
1180 rbd_destroy_ops(ops);
1186 * Request sync osd watch
1188 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1192 struct ceph_osd_req_op *ops;
1195 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1199 ops[0].watch.ver = cpu_to_le64(ver);
1200 ops[0].watch.cookie = notify_id;
1201 ops[0].watch.flag = 0;
1203 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1204 rbd_dev->header_name, 0, 0, NULL,
1209 rbd_simple_req_cb, 0, NULL);
1211 rbd_destroy_ops(ops);
1215 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1217 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1224 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1225 rbd_dev->header_name, (unsigned long long) notify_id,
1226 (unsigned int) opcode);
1227 rc = rbd_refresh_header(rbd_dev, &hver);
1229 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1230 " update snaps: %d\n", rbd_dev->major, rc);
1232 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1236 * Request sync osd watch
1238 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1240 struct ceph_osd_req_op *ops;
1241 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1244 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1248 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1249 (void *)rbd_dev, &rbd_dev->watch_event);
1253 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1254 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1255 ops[0].watch.flag = 1;
1257 ret = rbd_req_sync_op(rbd_dev, NULL,
1259 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1261 rbd_dev->header_name,
1263 &rbd_dev->watch_request, NULL);
1268 rbd_destroy_ops(ops);
1272 ceph_osdc_cancel_event(rbd_dev->watch_event);
1273 rbd_dev->watch_event = NULL;
1275 rbd_destroy_ops(ops);
1280 * Request sync osd unwatch
1282 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1284 struct ceph_osd_req_op *ops;
1287 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1291 ops[0].watch.ver = 0;
1292 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1293 ops[0].watch.flag = 0;
1295 ret = rbd_req_sync_op(rbd_dev, NULL,
1297 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1299 rbd_dev->header_name,
1300 0, 0, NULL, NULL, NULL);
1303 rbd_destroy_ops(ops);
1304 ceph_osdc_cancel_event(rbd_dev->watch_event);
1305 rbd_dev->watch_event = NULL;
1309 struct rbd_notify_info {
1310 struct rbd_device *rbd_dev;
1313 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1315 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1319 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1320 rbd_dev->header_name, (unsigned long long) notify_id,
1321 (unsigned int) opcode);
1325 * Request sync osd notify
1327 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1329 struct ceph_osd_req_op *ops;
1330 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1331 struct ceph_osd_event *event;
1332 struct rbd_notify_info info;
1333 int payload_len = sizeof(u32) + sizeof(u32);
1336 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1340 info.rbd_dev = rbd_dev;
1342 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1343 (void *)&info, &event);
1347 ops[0].watch.ver = 1;
1348 ops[0].watch.flag = 1;
1349 ops[0].watch.cookie = event->cookie;
1350 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1351 ops[0].watch.timeout = 12;
1353 ret = rbd_req_sync_op(rbd_dev, NULL,
1355 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1357 rbd_dev->header_name,
1358 0, 0, NULL, NULL, NULL);
1362 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1363 dout("ceph_osdc_wait_event returned %d\n", ret);
1364 rbd_destroy_ops(ops);
1368 ceph_osdc_cancel_event(event);
1370 rbd_destroy_ops(ops);
1375 * Request sync osd read
1377 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1378 const char *object_name,
1379 const char *class_name,
1380 const char *method_name,
1385 struct ceph_osd_req_op *ops;
1386 int class_name_len = strlen(class_name);
1387 int method_name_len = strlen(method_name);
1390 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1391 class_name_len + method_name_len + len);
1395 ops[0].cls.class_name = class_name;
1396 ops[0].cls.class_len = (__u8) class_name_len;
1397 ops[0].cls.method_name = method_name;
1398 ops[0].cls.method_len = (__u8) method_name_len;
1399 ops[0].cls.argc = 0;
1400 ops[0].cls.indata = data;
1401 ops[0].cls.indata_len = len;
1403 ret = rbd_req_sync_op(rbd_dev, NULL,
1405 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1407 object_name, 0, 0, NULL, NULL, ver);
1409 rbd_destroy_ops(ops);
1411 dout("cls_exec returned %d\n", ret);
1415 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1417 struct rbd_req_coll *coll =
1418 kzalloc(sizeof(struct rbd_req_coll) +
1419 sizeof(struct rbd_req_status) * num_reqs,
1424 coll->total = num_reqs;
1425 kref_init(&coll->kref);
1430 * block device queue callback
1432 static void rbd_rq_fn(struct request_queue *q)
1434 struct rbd_device *rbd_dev = q->queuedata;
1436 struct bio_pair *bp = NULL;
1438 while ((rq = blk_fetch_request(q))) {
1440 struct bio *rq_bio, *next_bio = NULL;
1445 int num_segs, cur_seg = 0;
1446 struct rbd_req_coll *coll;
1447 struct ceph_snap_context *snapc;
1449 /* peek at request from block layer */
1453 dout("fetched request\n");
1455 /* filter out block requests we don't understand */
1456 if ((rq->cmd_type != REQ_TYPE_FS)) {
1457 __blk_end_request_all(rq, 0);
1461 /* deduce our operation (read, write) */
1462 do_write = (rq_data_dir(rq) == WRITE);
1464 size = blk_rq_bytes(rq);
1465 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1467 if (do_write && rbd_dev->read_only) {
1468 __blk_end_request_all(rq, -EROFS);
1472 spin_unlock_irq(q->queue_lock);
1474 down_read(&rbd_dev->header_rwsem);
1476 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1477 up_read(&rbd_dev->header_rwsem);
1478 dout("request for non-existent snapshot");
1479 spin_lock_irq(q->queue_lock);
1480 __blk_end_request_all(rq, -ENXIO);
1484 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1486 up_read(&rbd_dev->header_rwsem);
1488 dout("%s 0x%x bytes at 0x%llx\n",
1489 do_write ? "write" : "read",
1490 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1492 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1493 coll = rbd_alloc_coll(num_segs);
1495 spin_lock_irq(q->queue_lock);
1496 __blk_end_request_all(rq, -ENOMEM);
1497 ceph_put_snap_context(snapc);
1502 /* a bio clone to be passed down to OSD req */
1503 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1504 op_size = rbd_get_segment(&rbd_dev->header,
1505 rbd_dev->header.object_prefix,
1508 kref_get(&coll->kref);
1509 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1510 op_size, GFP_ATOMIC);
1512 rbd_coll_end_req_index(rq, coll, cur_seg,
1518 /* init OSD command: write or read */
1520 rbd_req_write(rq, rbd_dev,
1526 rbd_req_read(rq, rbd_dev,
1539 kref_put(&coll->kref, rbd_coll_release);
1542 bio_pair_release(bp);
1543 spin_lock_irq(q->queue_lock);
1545 ceph_put_snap_context(snapc);
1550 * a queue callback. Makes sure that we don't create a bio that spans across
1551 * multiple osd objects. One exception would be with a single page bios,
1552 * which we handle later at bio_chain_clone
1554 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1555 struct bio_vec *bvec)
1557 struct rbd_device *rbd_dev = q->queuedata;
1558 unsigned int chunk_sectors;
1560 unsigned int bio_sectors;
1563 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1564 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1565 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1567 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1568 + bio_sectors)) << SECTOR_SHIFT;
1570 max = 0; /* bio_add cannot handle a negative return */
1571 if (max <= bvec->bv_len && bio_sectors == 0)
1572 return bvec->bv_len;
1576 static void rbd_free_disk(struct rbd_device *rbd_dev)
1578 struct gendisk *disk = rbd_dev->disk;
1583 rbd_header_free(&rbd_dev->header);
1585 if (disk->flags & GENHD_FL_UP)
1588 blk_cleanup_queue(disk->queue);
1593 * reload the ondisk the header
1595 static int rbd_read_header(struct rbd_device *rbd_dev,
1596 struct rbd_image_header *header)
1599 struct rbd_image_header_ondisk *dh;
1605 * First reads the fixed-size header to determine the number
1606 * of snapshots, then re-reads it, along with all snapshot
1607 * records as well as their stored names.
1611 dh = kmalloc(len, GFP_KERNEL);
1615 rc = rbd_req_sync_read(rbd_dev,
1617 rbd_dev->header_name,
1623 rc = rbd_header_from_disk(header, dh, snap_count);
1626 pr_warning("unrecognized header format"
1628 rbd_dev->image_name);
1632 if (snap_count == header->total_snaps)
1635 snap_count = header->total_snaps;
1636 len = sizeof (*dh) +
1637 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1638 header->snap_names_len;
1640 rbd_header_free(header);
1643 header->obj_version = ver;
1653 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1654 const char *snap_name,
1657 int name_len = strlen(snap_name);
1661 struct ceph_mon_client *monc;
1663 /* we should create a snapshot only if we're pointing at the head */
1664 if (rbd_dev->snap_id != CEPH_NOSNAP)
1667 monc = &rbd_dev->rbd_client->client->monc;
1668 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1669 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1673 data = kmalloc(name_len + 16, gfp_flags);
1678 e = data + name_len + 16;
1680 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1681 ceph_encode_64_safe(&p, e, new_snapid, bad);
1683 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1685 data, p - data, NULL);
1689 return ret < 0 ? ret : 0;
1694 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1696 struct rbd_snap *snap;
1697 struct rbd_snap *next;
1699 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1700 __rbd_remove_snap_dev(snap);
1704 * only read the first part of the ondisk header, without the snaps info
1706 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1709 struct rbd_image_header h;
1711 ret = rbd_read_header(rbd_dev, &h);
1715 down_write(&rbd_dev->header_rwsem);
1718 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1719 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1721 dout("setting size to %llu sectors", (unsigned long long) size);
1722 set_capacity(rbd_dev->disk, size);
1725 /* rbd_dev->header.object_prefix shouldn't change */
1726 kfree(rbd_dev->header.snap_sizes);
1727 kfree(rbd_dev->header.snap_names);
1728 /* osd requests may still refer to snapc */
1729 ceph_put_snap_context(rbd_dev->header.snapc);
1732 *hver = h.obj_version;
1733 rbd_dev->header.obj_version = h.obj_version;
1734 rbd_dev->header.image_size = h.image_size;
1735 rbd_dev->header.total_snaps = h.total_snaps;
1736 rbd_dev->header.snapc = h.snapc;
1737 rbd_dev->header.snap_names = h.snap_names;
1738 rbd_dev->header.snap_names_len = h.snap_names_len;
1739 rbd_dev->header.snap_sizes = h.snap_sizes;
1740 /* Free the extra copy of the object prefix */
1741 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1742 kfree(h.object_prefix);
1744 ret = __rbd_init_snaps_header(rbd_dev);
1746 up_write(&rbd_dev->header_rwsem);
1751 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1755 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1756 ret = __rbd_refresh_header(rbd_dev, hver);
1757 mutex_unlock(&ctl_mutex);
1762 static int rbd_init_disk(struct rbd_device *rbd_dev)
1764 struct gendisk *disk;
1765 struct request_queue *q;
1770 /* contact OSD, request size info about the object being mapped */
1771 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1775 /* no need to lock here, as rbd_dev is not registered yet */
1776 rc = __rbd_init_snaps_header(rbd_dev);
1780 rc = rbd_header_set_snap(rbd_dev, &total_size);
1784 /* create gendisk info */
1786 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1790 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1792 disk->major = rbd_dev->major;
1793 disk->first_minor = 0;
1794 disk->fops = &rbd_bd_ops;
1795 disk->private_data = rbd_dev;
1799 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1803 /* We use the default size, but let's be explicit about it. */
1804 blk_queue_physical_block_size(q, SECTOR_SIZE);
1806 /* set io sizes to object size */
1807 segment_size = rbd_obj_bytes(&rbd_dev->header);
1808 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1809 blk_queue_max_segment_size(q, segment_size);
1810 blk_queue_io_min(q, segment_size);
1811 blk_queue_io_opt(q, segment_size);
1813 blk_queue_merge_bvec(q, rbd_merge_bvec);
1816 q->queuedata = rbd_dev;
1818 rbd_dev->disk = disk;
1821 /* finally, announce the disk to the world */
1822 set_capacity(disk, total_size / SECTOR_SIZE);
1825 pr_info("%s: added with size 0x%llx\n",
1826 disk->disk_name, (unsigned long long)total_size);
1839 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1841 return container_of(dev, struct rbd_device, dev);
1844 static ssize_t rbd_size_show(struct device *dev,
1845 struct device_attribute *attr, char *buf)
1847 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1850 down_read(&rbd_dev->header_rwsem);
1851 size = get_capacity(rbd_dev->disk);
1852 up_read(&rbd_dev->header_rwsem);
1854 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1857 static ssize_t rbd_major_show(struct device *dev,
1858 struct device_attribute *attr, char *buf)
1860 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1862 return sprintf(buf, "%d\n", rbd_dev->major);
1865 static ssize_t rbd_client_id_show(struct device *dev,
1866 struct device_attribute *attr, char *buf)
1868 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1870 return sprintf(buf, "client%lld\n",
1871 ceph_client_id(rbd_dev->rbd_client->client));
1874 static ssize_t rbd_pool_show(struct device *dev,
1875 struct device_attribute *attr, char *buf)
1877 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1879 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1882 static ssize_t rbd_pool_id_show(struct device *dev,
1883 struct device_attribute *attr, char *buf)
1885 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1887 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1890 static ssize_t rbd_name_show(struct device *dev,
1891 struct device_attribute *attr, char *buf)
1893 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1895 return sprintf(buf, "%s\n", rbd_dev->image_name);
1898 static ssize_t rbd_snap_show(struct device *dev,
1899 struct device_attribute *attr,
1902 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1904 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1907 static ssize_t rbd_image_refresh(struct device *dev,
1908 struct device_attribute *attr,
1912 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1915 ret = rbd_refresh_header(rbd_dev, NULL);
1917 return ret < 0 ? ret : size;
1920 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1921 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1922 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1923 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1924 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1925 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1926 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1927 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1928 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1930 static struct attribute *rbd_attrs[] = {
1931 &dev_attr_size.attr,
1932 &dev_attr_major.attr,
1933 &dev_attr_client_id.attr,
1934 &dev_attr_pool.attr,
1935 &dev_attr_pool_id.attr,
1936 &dev_attr_name.attr,
1937 &dev_attr_current_snap.attr,
1938 &dev_attr_refresh.attr,
1939 &dev_attr_create_snap.attr,
1943 static struct attribute_group rbd_attr_group = {
1947 static const struct attribute_group *rbd_attr_groups[] = {
1952 static void rbd_sysfs_dev_release(struct device *dev)
1956 static struct device_type rbd_device_type = {
1958 .groups = rbd_attr_groups,
1959 .release = rbd_sysfs_dev_release,
1967 static ssize_t rbd_snap_size_show(struct device *dev,
1968 struct device_attribute *attr,
1971 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1973 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1976 static ssize_t rbd_snap_id_show(struct device *dev,
1977 struct device_attribute *attr,
1980 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1982 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1985 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1986 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1988 static struct attribute *rbd_snap_attrs[] = {
1989 &dev_attr_snap_size.attr,
1990 &dev_attr_snap_id.attr,
1994 static struct attribute_group rbd_snap_attr_group = {
1995 .attrs = rbd_snap_attrs,
1998 static void rbd_snap_dev_release(struct device *dev)
2000 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2005 static const struct attribute_group *rbd_snap_attr_groups[] = {
2006 &rbd_snap_attr_group,
2010 static struct device_type rbd_snap_device_type = {
2011 .groups = rbd_snap_attr_groups,
2012 .release = rbd_snap_dev_release,
2015 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2017 list_del(&snap->node);
2018 device_unregister(&snap->dev);
2021 static int rbd_register_snap_dev(struct rbd_snap *snap,
2022 struct device *parent)
2024 struct device *dev = &snap->dev;
2027 dev->type = &rbd_snap_device_type;
2028 dev->parent = parent;
2029 dev->release = rbd_snap_dev_release;
2030 dev_set_name(dev, "snap_%s", snap->name);
2031 ret = device_register(dev);
2036 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2037 int i, const char *name)
2039 struct rbd_snap *snap;
2042 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2044 return ERR_PTR(-ENOMEM);
2047 snap->name = kstrdup(name, GFP_KERNEL);
2051 snap->size = rbd_dev->header.snap_sizes[i];
2052 snap->id = rbd_dev->header.snapc->snaps[i];
2053 if (device_is_registered(&rbd_dev->dev)) {
2054 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2065 return ERR_PTR(ret);
2069 * search for the previous snap in a null delimited string list
2071 const char *rbd_prev_snap_name(const char *name, const char *start)
2073 if (name < start + 2)
2086 * compare the old list of snapshots that we have to what's in the header
2087 * and update it accordingly. Note that the header holds the snapshots
2088 * in a reverse order (from newest to oldest) and we need to go from
2089 * older to new so that we don't get a duplicate snap name when
2090 * doing the process (e.g., removed snapshot and recreated a new
2091 * one with the same name.
2093 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2095 const char *name, *first_name;
2096 int i = rbd_dev->header.total_snaps;
2097 struct rbd_snap *snap, *old_snap = NULL;
2098 struct list_head *p, *n;
2100 first_name = rbd_dev->header.snap_names;
2101 name = first_name + rbd_dev->header.snap_names_len;
2103 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2106 old_snap = list_entry(p, struct rbd_snap, node);
2109 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2111 if (!i || old_snap->id < cur_id) {
2113 * old_snap->id was skipped, thus was
2114 * removed. If this rbd_dev is mapped to
2115 * the removed snapshot, record that it no
2116 * longer exists, to prevent further I/O.
2118 if (rbd_dev->snap_id == old_snap->id)
2119 rbd_dev->snap_exists = false;
2120 __rbd_remove_snap_dev(old_snap);
2123 if (old_snap->id == cur_id) {
2124 /* we have this snapshot already */
2126 name = rbd_prev_snap_name(name, first_name);
2130 i--, name = rbd_prev_snap_name(name, first_name)) {
2135 cur_id = rbd_dev->header.snapc->snaps[i];
2136 /* snapshot removal? handle it above */
2137 if (cur_id >= old_snap->id)
2139 /* a new snapshot */
2140 snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2142 return PTR_ERR(snap);
2144 /* note that we add it backward so using n and not p */
2145 list_add(&snap->node, n);
2149 /* we're done going over the old snap list, just add what's left */
2150 for (; i > 0; i--) {
2151 name = rbd_prev_snap_name(name, first_name);
2156 snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2158 return PTR_ERR(snap);
2159 list_add(&snap->node, &rbd_dev->snaps);
2165 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2169 struct rbd_snap *snap;
2171 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2172 dev = &rbd_dev->dev;
2174 dev->bus = &rbd_bus_type;
2175 dev->type = &rbd_device_type;
2176 dev->parent = &rbd_root_dev;
2177 dev->release = rbd_dev_release;
2178 dev_set_name(dev, "%d", rbd_dev->dev_id);
2179 ret = device_register(dev);
2183 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2184 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2189 mutex_unlock(&ctl_mutex);
2193 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2195 device_unregister(&rbd_dev->dev);
2198 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2203 ret = rbd_req_sync_watch(rbd_dev);
2204 if (ret == -ERANGE) {
2205 rc = rbd_refresh_header(rbd_dev, NULL);
2209 } while (ret == -ERANGE);
2214 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2217 * Get a unique rbd identifier for the given new rbd_dev, and add
2218 * the rbd_dev to the global list. The minimum rbd id is 1.
2220 static void rbd_id_get(struct rbd_device *rbd_dev)
2222 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2224 spin_lock(&rbd_dev_list_lock);
2225 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2226 spin_unlock(&rbd_dev_list_lock);
2230 * Remove an rbd_dev from the global list, and record that its
2231 * identifier is no longer in use.
2233 static void rbd_id_put(struct rbd_device *rbd_dev)
2235 struct list_head *tmp;
2236 int rbd_id = rbd_dev->dev_id;
2241 spin_lock(&rbd_dev_list_lock);
2242 list_del_init(&rbd_dev->node);
2245 * If the id being "put" is not the current maximum, there
2246 * is nothing special we need to do.
2248 if (rbd_id != atomic64_read(&rbd_id_max)) {
2249 spin_unlock(&rbd_dev_list_lock);
2254 * We need to update the current maximum id. Search the
2255 * list to find out what it is. We're more likely to find
2256 * the maximum at the end, so search the list backward.
2259 list_for_each_prev(tmp, &rbd_dev_list) {
2260 struct rbd_device *rbd_dev;
2262 rbd_dev = list_entry(tmp, struct rbd_device, node);
2263 if (rbd_id > max_id)
2266 spin_unlock(&rbd_dev_list_lock);
2269 * The max id could have been updated by rbd_id_get(), in
2270 * which case it now accurately reflects the new maximum.
2271 * Be careful not to overwrite the maximum value in that
2274 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2278 * Skips over white space at *buf, and updates *buf to point to the
2279 * first found non-space character (if any). Returns the length of
2280 * the token (string of non-white space characters) found. Note
2281 * that *buf must be terminated with '\0'.
2283 static inline size_t next_token(const char **buf)
2286 * These are the characters that produce nonzero for
2287 * isspace() in the "C" and "POSIX" locales.
2289 const char *spaces = " \f\n\r\t\v";
2291 *buf += strspn(*buf, spaces); /* Find start of token */
2293 return strcspn(*buf, spaces); /* Return token length */
2297 * Finds the next token in *buf, and if the provided token buffer is
2298 * big enough, copies the found token into it. The result, if
2299 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2300 * must be terminated with '\0' on entry.
2302 * Returns the length of the token found (not including the '\0').
2303 * Return value will be 0 if no token is found, and it will be >=
2304 * token_size if the token would not fit.
2306 * The *buf pointer will be updated to point beyond the end of the
2307 * found token. Note that this occurs even if the token buffer is
2308 * too small to hold it.
2310 static inline size_t copy_token(const char **buf,
2316 len = next_token(buf);
2317 if (len < token_size) {
2318 memcpy(token, *buf, len);
2319 *(token + len) = '\0';
2327 * Finds the next token in *buf, dynamically allocates a buffer big
2328 * enough to hold a copy of it, and copies the token into the new
2329 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2330 * that a duplicate buffer is created even for a zero-length token.
2332 * Returns a pointer to the newly-allocated duplicate, or a null
2333 * pointer if memory for the duplicate was not available. If
2334 * the lenp argument is a non-null pointer, the length of the token
2335 * (not including the '\0') is returned in *lenp.
2337 * If successful, the *buf pointer will be updated to point beyond
2338 * the end of the found token.
2340 * Note: uses GFP_KERNEL for allocation.
2342 static inline char *dup_token(const char **buf, size_t *lenp)
2347 len = next_token(buf);
2348 dup = kmalloc(len + 1, GFP_KERNEL);
2352 memcpy(dup, *buf, len);
2353 *(dup + len) = '\0';
2363 * This fills in the pool_name, image_name, image_name_len, snap_name,
2364 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2365 * on the list of monitor addresses and other options provided via
2368 * Note: rbd_dev is assumed to have been initially zero-filled.
2370 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2372 const char **mon_addrs,
2373 size_t *mon_addrs_size,
2375 size_t options_size)
2380 /* The first four tokens are required */
2382 len = next_token(&buf);
2385 *mon_addrs_size = len + 1;
2390 len = copy_token(&buf, options, options_size);
2391 if (!len || len >= options_size)
2395 rbd_dev->pool_name = dup_token(&buf, NULL);
2396 if (!rbd_dev->pool_name)
2399 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2400 if (!rbd_dev->image_name)
2403 /* Create the name of the header object */
2405 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2406 + sizeof (RBD_SUFFIX),
2408 if (!rbd_dev->header_name)
2410 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2413 * The snapshot name is optional. If none is is supplied,
2414 * we use the default value.
2416 rbd_dev->snap_name = dup_token(&buf, &len);
2417 if (!rbd_dev->snap_name)
2420 /* Replace the empty name with the default */
2421 kfree(rbd_dev->snap_name);
2423 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2424 if (!rbd_dev->snap_name)
2427 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2428 sizeof (RBD_SNAP_HEAD_NAME));
2434 kfree(rbd_dev->header_name);
2435 kfree(rbd_dev->image_name);
2436 kfree(rbd_dev->pool_name);
2437 rbd_dev->pool_name = NULL;
2442 static ssize_t rbd_add(struct bus_type *bus,
2447 struct rbd_device *rbd_dev = NULL;
2448 const char *mon_addrs = NULL;
2449 size_t mon_addrs_size = 0;
2450 struct ceph_osd_client *osdc;
2453 if (!try_module_get(THIS_MODULE))
2456 options = kmalloc(count, GFP_KERNEL);
2459 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2463 /* static rbd_device initialization */
2464 spin_lock_init(&rbd_dev->lock);
2465 INIT_LIST_HEAD(&rbd_dev->node);
2466 INIT_LIST_HEAD(&rbd_dev->snaps);
2467 init_rwsem(&rbd_dev->header_rwsem);
2469 /* generate unique id: find highest unique id, add one */
2470 rbd_id_get(rbd_dev);
2472 /* Fill in the device name, now that we have its id. */
2473 BUILD_BUG_ON(DEV_NAME_LEN
2474 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2475 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2477 /* parse add command */
2478 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2483 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2485 if (IS_ERR(rbd_dev->rbd_client)) {
2486 rc = PTR_ERR(rbd_dev->rbd_client);
2491 osdc = &rbd_dev->rbd_client->client->osdc;
2492 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2494 goto err_out_client;
2495 rbd_dev->pool_id = rc;
2497 /* register our block device */
2498 rc = register_blkdev(0, rbd_dev->name);
2500 goto err_out_client;
2501 rbd_dev->major = rc;
2503 rc = rbd_bus_add_dev(rbd_dev);
2505 goto err_out_blkdev;
2508 * At this point cleanup in the event of an error is the job
2509 * of the sysfs code (initiated by rbd_bus_del_dev()).
2511 * Set up and announce blkdev mapping.
2513 rc = rbd_init_disk(rbd_dev);
2517 rc = rbd_init_watch_dev(rbd_dev);
2524 /* this will also clean up rest of rbd_dev stuff */
2526 rbd_bus_del_dev(rbd_dev);
2531 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2533 rbd_put_client(rbd_dev);
2535 if (rbd_dev->pool_name) {
2536 kfree(rbd_dev->snap_name);
2537 kfree(rbd_dev->header_name);
2538 kfree(rbd_dev->image_name);
2539 kfree(rbd_dev->pool_name);
2541 rbd_id_put(rbd_dev);
2546 dout("Error adding device %s\n", buf);
2547 module_put(THIS_MODULE);
2549 return (ssize_t) rc;
2552 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2554 struct list_head *tmp;
2555 struct rbd_device *rbd_dev;
2557 spin_lock(&rbd_dev_list_lock);
2558 list_for_each(tmp, &rbd_dev_list) {
2559 rbd_dev = list_entry(tmp, struct rbd_device, node);
2560 if (rbd_dev->dev_id == dev_id) {
2561 spin_unlock(&rbd_dev_list_lock);
2565 spin_unlock(&rbd_dev_list_lock);
2569 static void rbd_dev_release(struct device *dev)
2571 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2573 if (rbd_dev->watch_request) {
2574 struct ceph_client *client = rbd_dev->rbd_client->client;
2576 ceph_osdc_unregister_linger_request(&client->osdc,
2577 rbd_dev->watch_request);
2579 if (rbd_dev->watch_event)
2580 rbd_req_sync_unwatch(rbd_dev);
2582 rbd_put_client(rbd_dev);
2584 /* clean up and free blkdev */
2585 rbd_free_disk(rbd_dev);
2586 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2588 /* done with the id, and with the rbd_dev */
2589 kfree(rbd_dev->snap_name);
2590 kfree(rbd_dev->header_name);
2591 kfree(rbd_dev->pool_name);
2592 kfree(rbd_dev->image_name);
2593 rbd_id_put(rbd_dev);
2596 /* release module ref */
2597 module_put(THIS_MODULE);
2600 static ssize_t rbd_remove(struct bus_type *bus,
2604 struct rbd_device *rbd_dev = NULL;
2609 rc = strict_strtoul(buf, 10, &ul);
2613 /* convert to int; abort if we lost anything in the conversion */
2614 target_id = (int) ul;
2615 if (target_id != ul)
2618 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2620 rbd_dev = __rbd_get_dev(target_id);
2626 __rbd_remove_all_snaps(rbd_dev);
2627 rbd_bus_del_dev(rbd_dev);
2630 mutex_unlock(&ctl_mutex);
2634 static ssize_t rbd_snap_add(struct device *dev,
2635 struct device_attribute *attr,
2639 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2641 char *name = kmalloc(count + 1, GFP_KERNEL);
2645 snprintf(name, count, "%s", buf);
2647 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2649 ret = rbd_header_add_snap(rbd_dev,
2654 ret = __rbd_refresh_header(rbd_dev, NULL);
2658 /* shouldn't hold ctl_mutex when notifying.. notify might
2659 trigger a watch callback that would need to get that mutex */
2660 mutex_unlock(&ctl_mutex);
2662 /* make a best effort, don't error if failed */
2663 rbd_req_sync_notify(rbd_dev);
2670 mutex_unlock(&ctl_mutex);
2676 * create control files in sysfs
2679 static int rbd_sysfs_init(void)
2683 ret = device_register(&rbd_root_dev);
2687 ret = bus_register(&rbd_bus_type);
2689 device_unregister(&rbd_root_dev);
2694 static void rbd_sysfs_cleanup(void)
2696 bus_unregister(&rbd_bus_type);
2697 device_unregister(&rbd_root_dev);
2700 int __init rbd_init(void)
2704 rc = rbd_sysfs_init();
2707 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2711 void __exit rbd_exit(void)
2713 rbd_sysfs_cleanup();
2716 module_init(rbd_init);
2717 module_exit(rbd_exit);
2719 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2720 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2721 MODULE_DESCRIPTION("rados block device");
2723 /* following authorship retained from original osdblk.c */
2724 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2726 MODULE_LICENSE("GPL");