2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
61 #define RBD_SNAP_HEAD_NAME "-"
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75 * block device image metadata (in-memory version)
77 struct rbd_image_header {
83 struct ceph_snap_context *snapc;
84 size_t snap_names_len;
98 * an instance of the client. multiple devices may share an rbd client.
101 struct ceph_client *client;
102 struct rbd_options *rbd_opts;
104 struct list_head node;
108 * a request completion status
110 struct rbd_req_status {
117 * a collection of requests
119 struct rbd_req_coll {
123 struct rbd_req_status status[0];
127 * a single io request
130 struct request *rq; /* blk layer request */
131 struct bio *bio; /* cloned bio */
132 struct page **pages; /* list of used pages */
135 struct rbd_req_coll *coll;
142 struct list_head node;
150 int dev_id; /* blkdev unique id */
152 int major; /* blkdev assigned major */
153 struct gendisk *disk; /* blkdev's gendisk and rq */
154 struct request_queue *q;
156 struct rbd_client *rbd_client;
158 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
160 spinlock_t lock; /* queue lock */
162 struct rbd_image_header header;
164 size_t image_name_len;
169 struct ceph_osd_event *watch_event;
170 struct ceph_osd_request *watch_request;
172 /* protects updating the header */
173 struct rw_semaphore header_rwsem;
174 /* name of the snapshot this device reads from */
176 /* id of the snapshot this device reads from */
177 u64 snap_id; /* current snapshot id */
178 /* whether the snap_id this device reads from still exists */
182 struct list_head node;
184 /* list of snapshots */
185 struct list_head snaps;
191 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
193 static LIST_HEAD(rbd_dev_list); /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
196 static LIST_HEAD(rbd_client_list); /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202 struct device_attribute *attr,
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
212 static struct bus_attribute rbd_bus_attrs[] = {
213 __ATTR(add, S_IWUSR, NULL, rbd_add),
214 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
218 static struct bus_type rbd_bus_type = {
220 .bus_attrs = rbd_bus_attrs,
223 static void rbd_root_dev_release(struct device *dev)
227 static struct device rbd_root_dev = {
229 .release = rbd_root_dev_release,
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
235 return get_device(&rbd_dev->dev);
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
240 put_device(&rbd_dev->dev);
243 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
247 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
249 rbd_get_dev(rbd_dev);
251 set_device_ro(bdev, rbd_dev->read_only);
253 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
259 static int rbd_release(struct gendisk *disk, fmode_t mode)
261 struct rbd_device *rbd_dev = disk->private_data;
263 rbd_put_dev(rbd_dev);
268 static const struct block_device_operations rbd_bd_ops = {
269 .owner = THIS_MODULE,
271 .release = rbd_release,
275 * Initialize an rbd client instance.
278 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
279 struct rbd_options *rbd_opts)
281 struct rbd_client *rbdc;
284 dout("rbd_client_create\n");
285 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
289 kref_init(&rbdc->kref);
290 INIT_LIST_HEAD(&rbdc->node);
292 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
294 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
295 if (IS_ERR(rbdc->client))
297 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
299 ret = ceph_open_session(rbdc->client);
303 rbdc->rbd_opts = rbd_opts;
305 spin_lock(&rbd_client_list_lock);
306 list_add_tail(&rbdc->node, &rbd_client_list);
307 spin_unlock(&rbd_client_list_lock);
309 mutex_unlock(&ctl_mutex);
311 dout("rbd_client_create created %p\n", rbdc);
315 ceph_destroy_client(rbdc->client);
317 mutex_unlock(&ctl_mutex);
321 ceph_destroy_options(ceph_opts);
326 * Find a ceph client with specific addr and configuration.
328 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
330 struct rbd_client *client_node;
332 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
335 list_for_each_entry(client_node, &rbd_client_list, node)
336 if (!ceph_compare_options(ceph_opts, client_node->client))
349 /* string args above */
352 static match_table_t rbd_opts_tokens = {
353 {Opt_notify_timeout, "notify_timeout=%d"},
355 /* string args above */
359 static int parse_rbd_opts_token(char *c, void *private)
361 struct rbd_options *rbd_opts = private;
362 substring_t argstr[MAX_OPT_ARGS];
363 int token, intval, ret;
365 token = match_token(c, rbd_opts_tokens, argstr);
369 if (token < Opt_last_int) {
370 ret = match_int(&argstr[0], &intval);
372 pr_err("bad mount option arg (not int) "
376 dout("got int token %d val %d\n", token, intval);
377 } else if (token > Opt_last_int && token < Opt_last_string) {
378 dout("got string token %d val %s\n", token,
381 dout("got token %d\n", token);
385 case Opt_notify_timeout:
386 rbd_opts->notify_timeout = intval;
395 * Get a ceph client with specific addr and configuration, if one does
396 * not exist create it.
398 static struct rbd_client *rbd_get_client(const char *mon_addr,
402 struct rbd_client *rbdc;
403 struct ceph_options *ceph_opts;
404 struct rbd_options *rbd_opts;
406 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
408 return ERR_PTR(-ENOMEM);
410 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
412 ceph_opts = ceph_parse_options(options, mon_addr,
413 mon_addr + mon_addr_len,
414 parse_rbd_opts_token, rbd_opts);
415 if (IS_ERR(ceph_opts)) {
417 return ERR_CAST(ceph_opts);
420 spin_lock(&rbd_client_list_lock);
421 rbdc = __rbd_client_find(ceph_opts);
423 /* using an existing client */
424 kref_get(&rbdc->kref);
425 spin_unlock(&rbd_client_list_lock);
427 ceph_destroy_options(ceph_opts);
432 spin_unlock(&rbd_client_list_lock);
434 rbdc = rbd_client_create(ceph_opts, rbd_opts);
443 * Destroy ceph client
445 * Caller must hold rbd_client_list_lock.
447 static void rbd_client_release(struct kref *kref)
449 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
451 dout("rbd_release_client %p\n", rbdc);
452 spin_lock(&rbd_client_list_lock);
453 list_del(&rbdc->node);
454 spin_unlock(&rbd_client_list_lock);
456 ceph_destroy_client(rbdc->client);
457 kfree(rbdc->rbd_opts);
462 * Drop reference to ceph client node. If it's not referenced anymore, release
465 static void rbd_put_client(struct rbd_device *rbd_dev)
467 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
468 rbd_dev->rbd_client = NULL;
472 * Destroy requests collection
474 static void rbd_coll_release(struct kref *kref)
476 struct rbd_req_coll *coll =
477 container_of(kref, struct rbd_req_coll, kref);
479 dout("rbd_coll_release %p\n", coll);
483 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
485 return !memcmp(&ondisk->text,
486 RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
490 * Create a new header structure, translate header format from the on-disk
493 static int rbd_header_from_disk(struct rbd_image_header *header,
494 struct rbd_image_header_ondisk *ondisk,
499 if (!rbd_dev_ondisk_valid(ondisk))
502 snap_count = le32_to_cpu(ondisk->snap_count);
503 if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
506 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
507 snap_count * sizeof(u64),
512 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
514 header->snap_names = kmalloc(header->snap_names_len,
516 if (!header->snap_names)
518 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
520 if (!header->snap_sizes)
523 header->snap_names = NULL;
524 header->snap_sizes = NULL;
527 header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
529 if (!header->object_prefix)
532 memcpy(header->object_prefix, ondisk->block_name,
533 sizeof(ondisk->block_name));
534 header->object_prefix[sizeof (ondisk->block_name)] = '\0';
536 header->image_size = le64_to_cpu(ondisk->image_size);
537 header->obj_order = ondisk->options.order;
538 header->crypt_type = ondisk->options.crypt_type;
539 header->comp_type = ondisk->options.comp_type;
541 atomic_set(&header->snapc->nref, 1);
542 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
543 header->snapc->num_snaps = snap_count;
544 header->total_snaps = snap_count;
546 if (snap_count && allocated_snaps == snap_count) {
547 for (i = 0; i < snap_count; i++) {
548 header->snapc->snaps[i] =
549 le64_to_cpu(ondisk->snaps[i].id);
550 header->snap_sizes[i] =
551 le64_to_cpu(ondisk->snaps[i].image_size);
554 /* copy snapshot names */
555 memcpy(header->snap_names, &ondisk->snaps[i],
556 header->snap_names_len);
562 kfree(header->snap_sizes);
564 kfree(header->snap_names);
566 kfree(header->snapc);
570 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
574 char *p = header->snap_names;
576 for (i = 0; i < header->total_snaps; i++) {
577 if (!strcmp(snap_name, p)) {
579 /* Found it. Pass back its id and/or size */
582 *seq = header->snapc->snaps[i];
584 *size = header->snap_sizes[i];
587 p += strlen(p) + 1; /* Skip ahead to the next name */
592 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
596 down_write(&rbd_dev->header_rwsem);
598 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
599 sizeof (RBD_SNAP_HEAD_NAME))) {
600 rbd_dev->snap_id = CEPH_NOSNAP;
601 rbd_dev->snap_exists = false;
602 rbd_dev->read_only = 0;
604 *size = rbd_dev->header.image_size;
608 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
612 rbd_dev->snap_id = snap_id;
613 rbd_dev->snap_exists = true;
614 rbd_dev->read_only = 1;
619 up_write(&rbd_dev->header_rwsem);
623 static void rbd_header_free(struct rbd_image_header *header)
625 kfree(header->object_prefix);
626 kfree(header->snap_sizes);
627 kfree(header->snap_names);
628 ceph_put_snap_context(header->snapc);
632 * get the actual striped segment name, offset and length
634 static u64 rbd_get_segment(struct rbd_image_header *header,
635 const char *object_prefix,
637 char *seg_name, u64 *segofs)
639 u64 seg = ofs >> header->obj_order;
642 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
643 "%s.%012llx", object_prefix, seg);
645 ofs = ofs & ((1 << header->obj_order) - 1);
646 len = min_t(u64, len, (1 << header->obj_order) - ofs);
654 static int rbd_get_num_segments(struct rbd_image_header *header,
657 u64 start_seg = ofs >> header->obj_order;
658 u64 end_seg = (ofs + len - 1) >> header->obj_order;
659 return end_seg - start_seg + 1;
663 * returns the size of an object in the image
665 static u64 rbd_obj_bytes(struct rbd_image_header *header)
667 return 1 << header->obj_order;
674 static void bio_chain_put(struct bio *chain)
680 chain = chain->bi_next;
686 * zeros a bio chain, starting at specific offset
688 static void zero_bio_chain(struct bio *chain, int start_ofs)
697 bio_for_each_segment(bv, chain, i) {
698 if (pos + bv->bv_len > start_ofs) {
699 int remainder = max(start_ofs - pos, 0);
700 buf = bvec_kmap_irq(bv, &flags);
701 memset(buf + remainder, 0,
702 bv->bv_len - remainder);
703 bvec_kunmap_irq(buf, &flags);
708 chain = chain->bi_next;
713 * bio_chain_clone - clone a chain of bios up to a certain length.
714 * might return a bio_pair that will need to be released.
716 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
717 struct bio_pair **bp,
718 int len, gfp_t gfpmask)
720 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
724 bio_pair_release(*bp);
728 while (old_chain && (total < len)) {
729 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
733 if (total + old_chain->bi_size > len) {
737 * this split can only happen with a single paged bio,
738 * split_bio will BUG_ON if this is not the case
740 dout("bio_chain_clone split! total=%d remaining=%d"
742 total, len - total, old_chain->bi_size);
744 /* split the bio. We'll release it either in the next
745 call, or it will have to be released outside */
746 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
750 __bio_clone(tmp, &bp->bio1);
754 __bio_clone(tmp, old_chain);
755 *next = old_chain->bi_next;
759 gfpmask &= ~__GFP_WAIT;
763 new_chain = tail = tmp;
768 old_chain = old_chain->bi_next;
770 total += tmp->bi_size;
776 tail->bi_next = NULL;
783 dout("bio_chain_clone with err\n");
784 bio_chain_put(new_chain);
789 * helpers for osd request op vectors.
791 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
792 int opcode, u32 payload_len)
794 struct ceph_osd_req_op *ops;
796 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
803 * op extent offset and length will be set later on
804 * in calc_raw_layout()
806 ops[0].payload_len = payload_len;
811 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
816 static void rbd_coll_end_req_index(struct request *rq,
817 struct rbd_req_coll *coll,
821 struct request_queue *q;
824 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
825 coll, index, ret, (unsigned long long) len);
831 blk_end_request(rq, ret, len);
837 spin_lock_irq(q->queue_lock);
838 coll->status[index].done = 1;
839 coll->status[index].rc = ret;
840 coll->status[index].bytes = len;
841 max = min = coll->num_done;
842 while (max < coll->total && coll->status[max].done)
845 for (i = min; i<max; i++) {
846 __blk_end_request(rq, coll->status[i].rc,
847 coll->status[i].bytes);
849 kref_put(&coll->kref, rbd_coll_release);
851 spin_unlock_irq(q->queue_lock);
854 static void rbd_coll_end_req(struct rbd_request *req,
857 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
861 * Send ceph osd request
863 static int rbd_do_request(struct request *rq,
864 struct rbd_device *rbd_dev,
865 struct ceph_snap_context *snapc,
867 const char *object_name, u64 ofs, u64 len,
872 struct ceph_osd_req_op *ops,
873 struct rbd_req_coll *coll,
875 void (*rbd_cb)(struct ceph_osd_request *req,
876 struct ceph_msg *msg),
877 struct ceph_osd_request **linger_req,
880 struct ceph_osd_request *req;
881 struct ceph_file_layout *layout;
884 struct timespec mtime = CURRENT_TIME;
885 struct rbd_request *req_data;
886 struct ceph_osd_request_head *reqhead;
887 struct ceph_osd_client *osdc;
889 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
892 rbd_coll_end_req_index(rq, coll, coll_index,
898 req_data->coll = coll;
899 req_data->coll_index = coll_index;
902 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
903 (unsigned long long) ofs, (unsigned long long) len);
905 osdc = &rbd_dev->rbd_client->client->osdc;
906 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
907 false, GFP_NOIO, pages, bio);
913 req->r_callback = rbd_cb;
917 req_data->pages = pages;
920 req->r_priv = req_data;
922 reqhead = req->r_request->front.iov_base;
923 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
925 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
926 req->r_oid_len = strlen(req->r_oid);
928 layout = &req->r_file_layout;
929 memset(layout, 0, sizeof(*layout));
930 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
931 layout->fl_stripe_count = cpu_to_le32(1);
932 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
933 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
934 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
937 ceph_osdc_build_request(req, ofs, &len,
941 req->r_oid, req->r_oid_len);
944 ceph_osdc_set_request_linger(osdc, req);
948 ret = ceph_osdc_start_request(osdc, req, false);
953 ret = ceph_osdc_wait_request(osdc, req);
955 *ver = le64_to_cpu(req->r_reassert_version.version);
956 dout("reassert_ver=%llu\n",
958 le64_to_cpu(req->r_reassert_version.version));
959 ceph_osdc_put_request(req);
964 bio_chain_put(req_data->bio);
965 ceph_osdc_put_request(req);
967 rbd_coll_end_req(req_data, ret, len);
973 * Ceph osd op callback
975 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
977 struct rbd_request *req_data = req->r_priv;
978 struct ceph_osd_reply_head *replyhead;
979 struct ceph_osd_op *op;
985 replyhead = msg->front.iov_base;
986 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
987 op = (void *)(replyhead + 1);
988 rc = le32_to_cpu(replyhead->result);
989 bytes = le64_to_cpu(op->extent.length);
990 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
992 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
993 (unsigned long long) bytes, read_op, (int) rc);
995 if (rc == -ENOENT && read_op) {
996 zero_bio_chain(req_data->bio, 0);
998 } else if (rc == 0 && read_op && bytes < req_data->len) {
999 zero_bio_chain(req_data->bio, bytes);
1000 bytes = req_data->len;
1003 rbd_coll_end_req(req_data, rc, bytes);
1006 bio_chain_put(req_data->bio);
1008 ceph_osdc_put_request(req);
1012 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1014 ceph_osdc_put_request(req);
1018 * Do a synchronous ceph osd operation
1020 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1021 struct ceph_snap_context *snapc,
1025 struct ceph_osd_req_op *orig_ops,
1026 const char *object_name,
1029 struct ceph_osd_request **linger_req,
1033 struct page **pages;
1035 struct ceph_osd_req_op *ops = orig_ops;
1038 num_pages = calc_pages_for(ofs , len);
1039 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1041 return PTR_ERR(pages);
1044 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1046 ops = rbd_create_rw_ops(1, opcode, payload_len);
1050 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1051 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1057 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1058 object_name, ofs, len, NULL,
1068 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1069 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1073 rbd_destroy_ops(ops);
1075 ceph_release_page_vector(pages, num_pages);
1080 * Do an asynchronous ceph osd operation
1082 static int rbd_do_op(struct request *rq,
1083 struct rbd_device *rbd_dev,
1084 struct ceph_snap_context *snapc,
1086 int opcode, int flags,
1089 struct rbd_req_coll *coll,
1096 struct ceph_osd_req_op *ops;
1099 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1103 seg_len = rbd_get_segment(&rbd_dev->header,
1104 rbd_dev->header.object_prefix,
1106 seg_name, &seg_ofs);
1108 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1111 ops = rbd_create_rw_ops(1, opcode, payload_len);
1115 /* we've taken care of segment sizes earlier when we
1116 cloned the bios. We should never have a segment
1117 truncated at this point */
1118 BUG_ON(seg_len < len);
1120 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1121 seg_name, seg_ofs, seg_len,
1127 rbd_req_cb, 0, NULL);
1129 rbd_destroy_ops(ops);
1136 * Request async osd write
1138 static int rbd_req_write(struct request *rq,
1139 struct rbd_device *rbd_dev,
1140 struct ceph_snap_context *snapc,
1143 struct rbd_req_coll *coll,
1146 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1148 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1149 ofs, len, bio, coll, coll_index);
1153 * Request async osd read
1155 static int rbd_req_read(struct request *rq,
1156 struct rbd_device *rbd_dev,
1160 struct rbd_req_coll *coll,
1163 return rbd_do_op(rq, rbd_dev, NULL,
1167 ofs, len, bio, coll, coll_index);
1171 * Request sync osd read
1173 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1175 const char *object_name,
1180 return rbd_req_sync_op(rbd_dev, NULL,
1185 object_name, ofs, len, buf, NULL, ver);
1189 * Request sync osd watch
1191 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1195 struct ceph_osd_req_op *ops;
1198 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1202 ops[0].watch.ver = cpu_to_le64(ver);
1203 ops[0].watch.cookie = notify_id;
1204 ops[0].watch.flag = 0;
1206 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1207 rbd_dev->header_name, 0, 0, NULL,
1212 rbd_simple_req_cb, 0, NULL);
1214 rbd_destroy_ops(ops);
1218 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1220 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1227 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1228 rbd_dev->header_name, (unsigned long long) notify_id,
1229 (unsigned int) opcode);
1230 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1231 rc = __rbd_refresh_header(rbd_dev);
1232 hver = rbd_dev->header.obj_version;
1233 mutex_unlock(&ctl_mutex);
1235 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1236 " update snaps: %d\n", rbd_dev->major, rc);
1238 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1242 * Request sync osd watch
1244 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1246 struct ceph_osd_req_op *ops;
1247 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1250 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1254 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1255 (void *)rbd_dev, &rbd_dev->watch_event);
1259 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1260 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1261 ops[0].watch.flag = 1;
1263 ret = rbd_req_sync_op(rbd_dev, NULL,
1266 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1268 rbd_dev->header_name,
1270 &rbd_dev->watch_request, NULL);
1275 rbd_destroy_ops(ops);
1279 ceph_osdc_cancel_event(rbd_dev->watch_event);
1280 rbd_dev->watch_event = NULL;
1282 rbd_destroy_ops(ops);
1287 * Request sync osd unwatch
1289 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1291 struct ceph_osd_req_op *ops;
1294 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1298 ops[0].watch.ver = 0;
1299 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1300 ops[0].watch.flag = 0;
1302 ret = rbd_req_sync_op(rbd_dev, NULL,
1305 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1307 rbd_dev->header_name,
1308 0, 0, NULL, NULL, NULL);
1311 rbd_destroy_ops(ops);
1312 ceph_osdc_cancel_event(rbd_dev->watch_event);
1313 rbd_dev->watch_event = NULL;
1317 struct rbd_notify_info {
1318 struct rbd_device *rbd_dev;
1321 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1323 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1327 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1328 rbd_dev->header_name, (unsigned long long) notify_id,
1329 (unsigned int) opcode);
1333 * Request sync osd notify
1335 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1337 struct ceph_osd_req_op *ops;
1338 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1339 struct ceph_osd_event *event;
1340 struct rbd_notify_info info;
1341 int payload_len = sizeof(u32) + sizeof(u32);
1344 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1348 info.rbd_dev = rbd_dev;
1350 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1351 (void *)&info, &event);
1355 ops[0].watch.ver = 1;
1356 ops[0].watch.flag = 1;
1357 ops[0].watch.cookie = event->cookie;
1358 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1359 ops[0].watch.timeout = 12;
1361 ret = rbd_req_sync_op(rbd_dev, NULL,
1364 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1366 rbd_dev->header_name,
1367 0, 0, NULL, NULL, NULL);
1371 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1372 dout("ceph_osdc_wait_event returned %d\n", ret);
1373 rbd_destroy_ops(ops);
1377 ceph_osdc_cancel_event(event);
1379 rbd_destroy_ops(ops);
1384 * Request sync osd read
1386 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1387 const char *object_name,
1388 const char *class_name,
1389 const char *method_name,
1394 struct ceph_osd_req_op *ops;
1395 int class_name_len = strlen(class_name);
1396 int method_name_len = strlen(method_name);
1399 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1400 class_name_len + method_name_len + len);
1404 ops[0].cls.class_name = class_name;
1405 ops[0].cls.class_len = (__u8) class_name_len;
1406 ops[0].cls.method_name = method_name;
1407 ops[0].cls.method_len = (__u8) method_name_len;
1408 ops[0].cls.argc = 0;
1409 ops[0].cls.indata = data;
1410 ops[0].cls.indata_len = len;
1412 ret = rbd_req_sync_op(rbd_dev, NULL,
1415 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1417 object_name, 0, 0, NULL, NULL, ver);
1419 rbd_destroy_ops(ops);
1421 dout("cls_exec returned %d\n", ret);
1425 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1427 struct rbd_req_coll *coll =
1428 kzalloc(sizeof(struct rbd_req_coll) +
1429 sizeof(struct rbd_req_status) * num_reqs,
1434 coll->total = num_reqs;
1435 kref_init(&coll->kref);
1440 * block device queue callback
1442 static void rbd_rq_fn(struct request_queue *q)
1444 struct rbd_device *rbd_dev = q->queuedata;
1446 struct bio_pair *bp = NULL;
1448 while ((rq = blk_fetch_request(q))) {
1450 struct bio *rq_bio, *next_bio = NULL;
1455 int num_segs, cur_seg = 0;
1456 struct rbd_req_coll *coll;
1457 struct ceph_snap_context *snapc;
1459 /* peek at request from block layer */
1463 dout("fetched request\n");
1465 /* filter out block requests we don't understand */
1466 if ((rq->cmd_type != REQ_TYPE_FS)) {
1467 __blk_end_request_all(rq, 0);
1471 /* deduce our operation (read, write) */
1472 do_write = (rq_data_dir(rq) == WRITE);
1474 size = blk_rq_bytes(rq);
1475 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1477 if (do_write && rbd_dev->read_only) {
1478 __blk_end_request_all(rq, -EROFS);
1482 spin_unlock_irq(q->queue_lock);
1484 down_read(&rbd_dev->header_rwsem);
1486 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1487 up_read(&rbd_dev->header_rwsem);
1488 dout("request for non-existent snapshot");
1489 spin_lock_irq(q->queue_lock);
1490 __blk_end_request_all(rq, -ENXIO);
1494 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1496 up_read(&rbd_dev->header_rwsem);
1498 dout("%s 0x%x bytes at 0x%llx\n",
1499 do_write ? "write" : "read",
1500 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1502 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1503 coll = rbd_alloc_coll(num_segs);
1505 spin_lock_irq(q->queue_lock);
1506 __blk_end_request_all(rq, -ENOMEM);
1507 ceph_put_snap_context(snapc);
1512 /* a bio clone to be passed down to OSD req */
1513 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1514 op_size = rbd_get_segment(&rbd_dev->header,
1515 rbd_dev->header.object_prefix,
1518 kref_get(&coll->kref);
1519 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1520 op_size, GFP_ATOMIC);
1522 rbd_coll_end_req_index(rq, coll, cur_seg,
1528 /* init OSD command: write or read */
1530 rbd_req_write(rq, rbd_dev,
1536 rbd_req_read(rq, rbd_dev,
1549 kref_put(&coll->kref, rbd_coll_release);
1552 bio_pair_release(bp);
1553 spin_lock_irq(q->queue_lock);
1555 ceph_put_snap_context(snapc);
1560 * a queue callback. Makes sure that we don't create a bio that spans across
1561 * multiple osd objects. One exception would be with a single page bios,
1562 * which we handle later at bio_chain_clone
1564 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1565 struct bio_vec *bvec)
1567 struct rbd_device *rbd_dev = q->queuedata;
1568 unsigned int chunk_sectors;
1570 unsigned int bio_sectors;
1573 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1574 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1575 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1577 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1578 + bio_sectors)) << SECTOR_SHIFT;
1580 max = 0; /* bio_add cannot handle a negative return */
1581 if (max <= bvec->bv_len && bio_sectors == 0)
1582 return bvec->bv_len;
1586 static void rbd_free_disk(struct rbd_device *rbd_dev)
1588 struct gendisk *disk = rbd_dev->disk;
1593 rbd_header_free(&rbd_dev->header);
1595 if (disk->flags & GENHD_FL_UP)
1598 blk_cleanup_queue(disk->queue);
1603 * reload the ondisk the header
1605 static int rbd_read_header(struct rbd_device *rbd_dev,
1606 struct rbd_image_header *header)
1609 struct rbd_image_header_ondisk *dh;
1615 * First reads the fixed-size header to determine the number
1616 * of snapshots, then re-reads it, along with all snapshot
1617 * records as well as their stored names.
1621 dh = kmalloc(len, GFP_KERNEL);
1625 rc = rbd_req_sync_read(rbd_dev,
1627 rbd_dev->header_name,
1633 rc = rbd_header_from_disk(header, dh, snap_count);
1636 pr_warning("unrecognized header format"
1638 rbd_dev->image_name);
1642 if (snap_count == header->total_snaps)
1645 snap_count = header->total_snaps;
1646 len = sizeof (*dh) +
1647 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1648 header->snap_names_len;
1650 rbd_header_free(header);
1653 header->obj_version = ver;
1663 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1664 const char *snap_name,
1667 int name_len = strlen(snap_name);
1671 struct ceph_mon_client *monc;
1673 /* we should create a snapshot only if we're pointing at the head */
1674 if (rbd_dev->snap_id != CEPH_NOSNAP)
1677 monc = &rbd_dev->rbd_client->client->monc;
1678 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1679 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1683 data = kmalloc(name_len + 16, gfp_flags);
1688 e = data + name_len + 16;
1690 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1691 ceph_encode_64_safe(&p, e, new_snapid, bad);
1693 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1695 data, p - data, NULL);
1699 return ret < 0 ? ret : 0;
1704 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1706 struct rbd_snap *snap;
1707 struct rbd_snap *next;
1709 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1710 __rbd_remove_snap_dev(snap);
1714 * only read the first part of the ondisk header, without the snaps info
1716 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1719 struct rbd_image_header h;
1721 ret = rbd_read_header(rbd_dev, &h);
1725 down_write(&rbd_dev->header_rwsem);
1728 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1729 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1731 dout("setting size to %llu sectors", (unsigned long long) size);
1732 set_capacity(rbd_dev->disk, size);
1735 /* rbd_dev->header.object_prefix shouldn't change */
1736 kfree(rbd_dev->header.snap_sizes);
1737 kfree(rbd_dev->header.snap_names);
1738 /* osd requests may still refer to snapc */
1739 ceph_put_snap_context(rbd_dev->header.snapc);
1741 rbd_dev->header.obj_version = h.obj_version;
1742 rbd_dev->header.image_size = h.image_size;
1743 rbd_dev->header.total_snaps = h.total_snaps;
1744 rbd_dev->header.snapc = h.snapc;
1745 rbd_dev->header.snap_names = h.snap_names;
1746 rbd_dev->header.snap_names_len = h.snap_names_len;
1747 rbd_dev->header.snap_sizes = h.snap_sizes;
1748 /* Free the extra copy of the object prefix */
1749 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1750 kfree(h.object_prefix);
1752 ret = __rbd_init_snaps_header(rbd_dev);
1754 up_write(&rbd_dev->header_rwsem);
1759 static int rbd_init_disk(struct rbd_device *rbd_dev)
1761 struct gendisk *disk;
1762 struct request_queue *q;
1767 /* contact OSD, request size info about the object being mapped */
1768 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1772 /* no need to lock here, as rbd_dev is not registered yet */
1773 rc = __rbd_init_snaps_header(rbd_dev);
1777 rc = rbd_header_set_snap(rbd_dev, &total_size);
1781 /* create gendisk info */
1783 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1787 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1789 disk->major = rbd_dev->major;
1790 disk->first_minor = 0;
1791 disk->fops = &rbd_bd_ops;
1792 disk->private_data = rbd_dev;
1796 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1800 /* We use the default size, but let's be explicit about it. */
1801 blk_queue_physical_block_size(q, SECTOR_SIZE);
1803 /* set io sizes to object size */
1804 segment_size = rbd_obj_bytes(&rbd_dev->header);
1805 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1806 blk_queue_max_segment_size(q, segment_size);
1807 blk_queue_io_min(q, segment_size);
1808 blk_queue_io_opt(q, segment_size);
1810 blk_queue_merge_bvec(q, rbd_merge_bvec);
1813 q->queuedata = rbd_dev;
1815 rbd_dev->disk = disk;
1818 /* finally, announce the disk to the world */
1819 set_capacity(disk, total_size / SECTOR_SIZE);
1822 pr_info("%s: added with size 0x%llx\n",
1823 disk->disk_name, (unsigned long long)total_size);
1836 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1838 return container_of(dev, struct rbd_device, dev);
1841 static ssize_t rbd_size_show(struct device *dev,
1842 struct device_attribute *attr, char *buf)
1844 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1847 down_read(&rbd_dev->header_rwsem);
1848 size = get_capacity(rbd_dev->disk);
1849 up_read(&rbd_dev->header_rwsem);
1851 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1854 static ssize_t rbd_major_show(struct device *dev,
1855 struct device_attribute *attr, char *buf)
1857 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1859 return sprintf(buf, "%d\n", rbd_dev->major);
1862 static ssize_t rbd_client_id_show(struct device *dev,
1863 struct device_attribute *attr, char *buf)
1865 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1867 return sprintf(buf, "client%lld\n",
1868 ceph_client_id(rbd_dev->rbd_client->client));
1871 static ssize_t rbd_pool_show(struct device *dev,
1872 struct device_attribute *attr, char *buf)
1874 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1876 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1879 static ssize_t rbd_pool_id_show(struct device *dev,
1880 struct device_attribute *attr, char *buf)
1882 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1884 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1887 static ssize_t rbd_name_show(struct device *dev,
1888 struct device_attribute *attr, char *buf)
1890 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1892 return sprintf(buf, "%s\n", rbd_dev->image_name);
1895 static ssize_t rbd_snap_show(struct device *dev,
1896 struct device_attribute *attr,
1899 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1901 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1904 static ssize_t rbd_image_refresh(struct device *dev,
1905 struct device_attribute *attr,
1909 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1913 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1915 rc = __rbd_refresh_header(rbd_dev);
1919 mutex_unlock(&ctl_mutex);
1923 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1924 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1925 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1926 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1927 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1928 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1929 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1930 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1931 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1933 static struct attribute *rbd_attrs[] = {
1934 &dev_attr_size.attr,
1935 &dev_attr_major.attr,
1936 &dev_attr_client_id.attr,
1937 &dev_attr_pool.attr,
1938 &dev_attr_pool_id.attr,
1939 &dev_attr_name.attr,
1940 &dev_attr_current_snap.attr,
1941 &dev_attr_refresh.attr,
1942 &dev_attr_create_snap.attr,
1946 static struct attribute_group rbd_attr_group = {
1950 static const struct attribute_group *rbd_attr_groups[] = {
1955 static void rbd_sysfs_dev_release(struct device *dev)
1959 static struct device_type rbd_device_type = {
1961 .groups = rbd_attr_groups,
1962 .release = rbd_sysfs_dev_release,
1970 static ssize_t rbd_snap_size_show(struct device *dev,
1971 struct device_attribute *attr,
1974 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1976 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1979 static ssize_t rbd_snap_id_show(struct device *dev,
1980 struct device_attribute *attr,
1983 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1985 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1988 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1989 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1991 static struct attribute *rbd_snap_attrs[] = {
1992 &dev_attr_snap_size.attr,
1993 &dev_attr_snap_id.attr,
1997 static struct attribute_group rbd_snap_attr_group = {
1998 .attrs = rbd_snap_attrs,
2001 static void rbd_snap_dev_release(struct device *dev)
2003 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2008 static const struct attribute_group *rbd_snap_attr_groups[] = {
2009 &rbd_snap_attr_group,
2013 static struct device_type rbd_snap_device_type = {
2014 .groups = rbd_snap_attr_groups,
2015 .release = rbd_snap_dev_release,
2018 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2020 list_del(&snap->node);
2021 device_unregister(&snap->dev);
2024 static int rbd_register_snap_dev(struct rbd_snap *snap,
2025 struct device *parent)
2027 struct device *dev = &snap->dev;
2030 dev->type = &rbd_snap_device_type;
2031 dev->parent = parent;
2032 dev->release = rbd_snap_dev_release;
2033 dev_set_name(dev, "snap_%s", snap->name);
2034 ret = device_register(dev);
2039 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2040 int i, const char *name)
2042 struct rbd_snap *snap;
2045 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2047 return ERR_PTR(-ENOMEM);
2050 snap->name = kstrdup(name, GFP_KERNEL);
2054 snap->size = rbd_dev->header.snap_sizes[i];
2055 snap->id = rbd_dev->header.snapc->snaps[i];
2056 if (device_is_registered(&rbd_dev->dev)) {
2057 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2068 return ERR_PTR(ret);
2072 * search for the previous snap in a null delimited string list
2074 const char *rbd_prev_snap_name(const char *name, const char *start)
2076 if (name < start + 2)
2089 * compare the old list of snapshots that we have to what's in the header
2090 * and update it accordingly. Note that the header holds the snapshots
2091 * in a reverse order (from newest to oldest) and we need to go from
2092 * older to new so that we don't get a duplicate snap name when
2093 * doing the process (e.g., removed snapshot and recreated a new
2094 * one with the same name.
2096 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2098 const char *name, *first_name;
2099 int i = rbd_dev->header.total_snaps;
2100 struct rbd_snap *snap, *old_snap = NULL;
2101 struct list_head *p, *n;
2103 first_name = rbd_dev->header.snap_names;
2104 name = first_name + rbd_dev->header.snap_names_len;
2106 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2109 old_snap = list_entry(p, struct rbd_snap, node);
2112 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2114 if (!i || old_snap->id < cur_id) {
2116 * old_snap->id was skipped, thus was
2117 * removed. If this rbd_dev is mapped to
2118 * the removed snapshot, record that it no
2119 * longer exists, to prevent further I/O.
2121 if (rbd_dev->snap_id == old_snap->id)
2122 rbd_dev->snap_exists = false;
2123 __rbd_remove_snap_dev(old_snap);
2126 if (old_snap->id == cur_id) {
2127 /* we have this snapshot already */
2129 name = rbd_prev_snap_name(name, first_name);
2133 i--, name = rbd_prev_snap_name(name, first_name)) {
2138 cur_id = rbd_dev->header.snapc->snaps[i];
2139 /* snapshot removal? handle it above */
2140 if (cur_id >= old_snap->id)
2142 /* a new snapshot */
2143 snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2145 return PTR_ERR(snap);
2147 /* note that we add it backward so using n and not p */
2148 list_add(&snap->node, n);
2152 /* we're done going over the old snap list, just add what's left */
2153 for (; i > 0; i--) {
2154 name = rbd_prev_snap_name(name, first_name);
2159 snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2161 return PTR_ERR(snap);
2162 list_add(&snap->node, &rbd_dev->snaps);
2168 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2172 struct rbd_snap *snap;
2174 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2175 dev = &rbd_dev->dev;
2177 dev->bus = &rbd_bus_type;
2178 dev->type = &rbd_device_type;
2179 dev->parent = &rbd_root_dev;
2180 dev->release = rbd_dev_release;
2181 dev_set_name(dev, "%d", rbd_dev->dev_id);
2182 ret = device_register(dev);
2186 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2187 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2192 mutex_unlock(&ctl_mutex);
2196 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2198 device_unregister(&rbd_dev->dev);
2201 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2206 ret = rbd_req_sync_watch(rbd_dev);
2207 if (ret == -ERANGE) {
2208 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2209 rc = __rbd_refresh_header(rbd_dev);
2210 mutex_unlock(&ctl_mutex);
2214 } while (ret == -ERANGE);
2219 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2222 * Get a unique rbd identifier for the given new rbd_dev, and add
2223 * the rbd_dev to the global list. The minimum rbd id is 1.
2225 static void rbd_id_get(struct rbd_device *rbd_dev)
2227 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2229 spin_lock(&rbd_dev_list_lock);
2230 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2231 spin_unlock(&rbd_dev_list_lock);
2235 * Remove an rbd_dev from the global list, and record that its
2236 * identifier is no longer in use.
2238 static void rbd_id_put(struct rbd_device *rbd_dev)
2240 struct list_head *tmp;
2241 int rbd_id = rbd_dev->dev_id;
2246 spin_lock(&rbd_dev_list_lock);
2247 list_del_init(&rbd_dev->node);
2250 * If the id being "put" is not the current maximum, there
2251 * is nothing special we need to do.
2253 if (rbd_id != atomic64_read(&rbd_id_max)) {
2254 spin_unlock(&rbd_dev_list_lock);
2259 * We need to update the current maximum id. Search the
2260 * list to find out what it is. We're more likely to find
2261 * the maximum at the end, so search the list backward.
2264 list_for_each_prev(tmp, &rbd_dev_list) {
2265 struct rbd_device *rbd_dev;
2267 rbd_dev = list_entry(tmp, struct rbd_device, node);
2268 if (rbd_id > max_id)
2271 spin_unlock(&rbd_dev_list_lock);
2274 * The max id could have been updated by rbd_id_get(), in
2275 * which case it now accurately reflects the new maximum.
2276 * Be careful not to overwrite the maximum value in that
2279 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2283 * Skips over white space at *buf, and updates *buf to point to the
2284 * first found non-space character (if any). Returns the length of
2285 * the token (string of non-white space characters) found. Note
2286 * that *buf must be terminated with '\0'.
2288 static inline size_t next_token(const char **buf)
2291 * These are the characters that produce nonzero for
2292 * isspace() in the "C" and "POSIX" locales.
2294 const char *spaces = " \f\n\r\t\v";
2296 *buf += strspn(*buf, spaces); /* Find start of token */
2298 return strcspn(*buf, spaces); /* Return token length */
2302 * Finds the next token in *buf, and if the provided token buffer is
2303 * big enough, copies the found token into it. The result, if
2304 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2305 * must be terminated with '\0' on entry.
2307 * Returns the length of the token found (not including the '\0').
2308 * Return value will be 0 if no token is found, and it will be >=
2309 * token_size if the token would not fit.
2311 * The *buf pointer will be updated to point beyond the end of the
2312 * found token. Note that this occurs even if the token buffer is
2313 * too small to hold it.
2315 static inline size_t copy_token(const char **buf,
2321 len = next_token(buf);
2322 if (len < token_size) {
2323 memcpy(token, *buf, len);
2324 *(token + len) = '\0';
2332 * Finds the next token in *buf, dynamically allocates a buffer big
2333 * enough to hold a copy of it, and copies the token into the new
2334 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2335 * that a duplicate buffer is created even for a zero-length token.
2337 * Returns a pointer to the newly-allocated duplicate, or a null
2338 * pointer if memory for the duplicate was not available. If
2339 * the lenp argument is a non-null pointer, the length of the token
2340 * (not including the '\0') is returned in *lenp.
2342 * If successful, the *buf pointer will be updated to point beyond
2343 * the end of the found token.
2345 * Note: uses GFP_KERNEL for allocation.
2347 static inline char *dup_token(const char **buf, size_t *lenp)
2352 len = next_token(buf);
2353 dup = kmalloc(len + 1, GFP_KERNEL);
2357 memcpy(dup, *buf, len);
2358 *(dup + len) = '\0';
2368 * This fills in the pool_name, image_name, image_name_len, snap_name,
2369 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2370 * on the list of monitor addresses and other options provided via
2373 * Note: rbd_dev is assumed to have been initially zero-filled.
2375 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2377 const char **mon_addrs,
2378 size_t *mon_addrs_size,
2380 size_t options_size)
2385 /* The first four tokens are required */
2387 len = next_token(&buf);
2390 *mon_addrs_size = len + 1;
2395 len = copy_token(&buf, options, options_size);
2396 if (!len || len >= options_size)
2400 rbd_dev->pool_name = dup_token(&buf, NULL);
2401 if (!rbd_dev->pool_name)
2404 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2405 if (!rbd_dev->image_name)
2408 /* Create the name of the header object */
2410 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2411 + sizeof (RBD_SUFFIX),
2413 if (!rbd_dev->header_name)
2415 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2418 * The snapshot name is optional. If none is is supplied,
2419 * we use the default value.
2421 rbd_dev->snap_name = dup_token(&buf, &len);
2422 if (!rbd_dev->snap_name)
2425 /* Replace the empty name with the default */
2426 kfree(rbd_dev->snap_name);
2428 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2429 if (!rbd_dev->snap_name)
2432 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2433 sizeof (RBD_SNAP_HEAD_NAME));
2439 kfree(rbd_dev->header_name);
2440 kfree(rbd_dev->image_name);
2441 kfree(rbd_dev->pool_name);
2442 rbd_dev->pool_name = NULL;
2447 static ssize_t rbd_add(struct bus_type *bus,
2452 struct rbd_device *rbd_dev = NULL;
2453 const char *mon_addrs = NULL;
2454 size_t mon_addrs_size = 0;
2455 struct ceph_osd_client *osdc;
2458 if (!try_module_get(THIS_MODULE))
2461 options = kmalloc(count, GFP_KERNEL);
2464 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2468 /* static rbd_device initialization */
2469 spin_lock_init(&rbd_dev->lock);
2470 INIT_LIST_HEAD(&rbd_dev->node);
2471 INIT_LIST_HEAD(&rbd_dev->snaps);
2472 init_rwsem(&rbd_dev->header_rwsem);
2474 /* generate unique id: find highest unique id, add one */
2475 rbd_id_get(rbd_dev);
2477 /* Fill in the device name, now that we have its id. */
2478 BUILD_BUG_ON(DEV_NAME_LEN
2479 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2480 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2482 /* parse add command */
2483 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2488 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2490 if (IS_ERR(rbd_dev->rbd_client)) {
2491 rc = PTR_ERR(rbd_dev->rbd_client);
2496 osdc = &rbd_dev->rbd_client->client->osdc;
2497 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2499 goto err_out_client;
2500 rbd_dev->pool_id = rc;
2502 /* register our block device */
2503 rc = register_blkdev(0, rbd_dev->name);
2505 goto err_out_client;
2506 rbd_dev->major = rc;
2508 rc = rbd_bus_add_dev(rbd_dev);
2510 goto err_out_blkdev;
2513 * At this point cleanup in the event of an error is the job
2514 * of the sysfs code (initiated by rbd_bus_del_dev()).
2516 * Set up and announce blkdev mapping.
2518 rc = rbd_init_disk(rbd_dev);
2522 rc = rbd_init_watch_dev(rbd_dev);
2529 /* this will also clean up rest of rbd_dev stuff */
2531 rbd_bus_del_dev(rbd_dev);
2536 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2538 rbd_put_client(rbd_dev);
2540 if (rbd_dev->pool_name) {
2541 kfree(rbd_dev->snap_name);
2542 kfree(rbd_dev->header_name);
2543 kfree(rbd_dev->image_name);
2544 kfree(rbd_dev->pool_name);
2546 rbd_id_put(rbd_dev);
2551 dout("Error adding device %s\n", buf);
2552 module_put(THIS_MODULE);
2554 return (ssize_t) rc;
2557 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2559 struct list_head *tmp;
2560 struct rbd_device *rbd_dev;
2562 spin_lock(&rbd_dev_list_lock);
2563 list_for_each(tmp, &rbd_dev_list) {
2564 rbd_dev = list_entry(tmp, struct rbd_device, node);
2565 if (rbd_dev->dev_id == dev_id) {
2566 spin_unlock(&rbd_dev_list_lock);
2570 spin_unlock(&rbd_dev_list_lock);
2574 static void rbd_dev_release(struct device *dev)
2576 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2578 if (rbd_dev->watch_request) {
2579 struct ceph_client *client = rbd_dev->rbd_client->client;
2581 ceph_osdc_unregister_linger_request(&client->osdc,
2582 rbd_dev->watch_request);
2584 if (rbd_dev->watch_event)
2585 rbd_req_sync_unwatch(rbd_dev);
2587 rbd_put_client(rbd_dev);
2589 /* clean up and free blkdev */
2590 rbd_free_disk(rbd_dev);
2591 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2593 /* done with the id, and with the rbd_dev */
2594 kfree(rbd_dev->snap_name);
2595 kfree(rbd_dev->header_name);
2596 kfree(rbd_dev->pool_name);
2597 kfree(rbd_dev->image_name);
2598 rbd_id_put(rbd_dev);
2601 /* release module ref */
2602 module_put(THIS_MODULE);
2605 static ssize_t rbd_remove(struct bus_type *bus,
2609 struct rbd_device *rbd_dev = NULL;
2614 rc = strict_strtoul(buf, 10, &ul);
2618 /* convert to int; abort if we lost anything in the conversion */
2619 target_id = (int) ul;
2620 if (target_id != ul)
2623 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2625 rbd_dev = __rbd_get_dev(target_id);
2631 __rbd_remove_all_snaps(rbd_dev);
2632 rbd_bus_del_dev(rbd_dev);
2635 mutex_unlock(&ctl_mutex);
2639 static ssize_t rbd_snap_add(struct device *dev,
2640 struct device_attribute *attr,
2644 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2646 char *name = kmalloc(count + 1, GFP_KERNEL);
2650 snprintf(name, count, "%s", buf);
2652 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2654 ret = rbd_header_add_snap(rbd_dev,
2659 ret = __rbd_refresh_header(rbd_dev);
2663 /* shouldn't hold ctl_mutex when notifying.. notify might
2664 trigger a watch callback that would need to get that mutex */
2665 mutex_unlock(&ctl_mutex);
2667 /* make a best effort, don't error if failed */
2668 rbd_req_sync_notify(rbd_dev);
2675 mutex_unlock(&ctl_mutex);
2681 * create control files in sysfs
2684 static int rbd_sysfs_init(void)
2688 ret = device_register(&rbd_root_dev);
2692 ret = bus_register(&rbd_bus_type);
2694 device_unregister(&rbd_root_dev);
2699 static void rbd_sysfs_cleanup(void)
2701 bus_unregister(&rbd_bus_type);
2702 device_unregister(&rbd_root_dev);
2705 int __init rbd_init(void)
2709 rc = rbd_sysfs_init();
2712 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2716 void __exit rbd_exit(void)
2718 rbd_sysfs_cleanup();
2721 module_init(rbd_init);
2722 module_exit(rbd_exit);
2724 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2725 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2726 MODULE_DESCRIPTION("rados block device");
2728 /* following authorship retained from original osdblk.c */
2729 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2731 MODULE_LICENSE("GPL");