2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
27 1) Map a Linux block device to an existing rbd image.
29 Usage: <mon ip addr> <options> <pool name> <rbd image name> [snap name]
31 $ echo "192.168.0.1 name=admin rbd foo" > /sys/class/rbd/add
33 The snapshot name can be "-" or omitted to map the image read/write.
35 2) List all active blkdev<->object mappings.
37 In this example, we have performed step #1 twice, creating two blkdevs,
38 mapped to two separate rados objects in the rados rbd pool
40 $ cat /sys/class/rbd/list
41 #id major client_name pool name snap KB
42 0 254 client4143 rbd foo - 1024000
44 The columns, in order, are:
46 - blkdev assigned major
49 - rados block device name
50 - mapped snapshot ("-" if none)
56 Usage: <blkdev id> <snapname>
58 $ echo "0 mysnap" > /sys/class/rbd/snap_create
61 4) Listing a snapshot.
63 $ cat /sys/class/rbd/snaps_list
68 The columns, in order, are:
70 - snapshot name, '-' means none (active read/write version)
71 - size of device at time of snapshot
72 - the (*) indicates this is the active version
74 5) Rollback to snapshot.
76 Usage: <blkdev id> <snapname>
78 $ echo "0 mysnap" > /sys/class/rbd/snap_rollback
81 6) Mapping an image using snapshot.
83 A snapshot mapping is read-only. This is being done by passing
84 snap=<snapname> to the options when adding a device.
86 $ echo "192.168.0.1 name=admin,snap=mysnap rbd foo" > /sys/class/rbd/add
89 7) Remove an active blkdev<->rbd image mapping.
91 In this example, we remove the mapping with blkdev unique id 1.
93 $ echo 1 > /sys/class/rbd/remove
96 NOTE: The actual creation and deletion of rados objects is outside the scope
101 #include <linux/ceph/libceph.h>
102 #include <linux/ceph/osd_client.h>
103 #include <linux/ceph/mon_client.h>
104 #include <linux/ceph/decode.h>
106 #include <linux/kernel.h>
107 #include <linux/device.h>
108 #include <linux/module.h>
109 #include <linux/fs.h>
110 #include <linux/blkdev.h>
112 #include "rbd_types.h"
114 #define DRV_NAME "rbd"
115 #define DRV_NAME_LONG "rbd (rados block device)"
117 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
119 #define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX))
120 #define RBD_MAX_POOL_NAME_LEN 64
121 #define RBD_MAX_SNAP_NAME_LEN 32
122 #define RBD_MAX_OPT_LEN 1024
124 #define RBD_SNAP_HEAD_NAME "-"
126 #define DEV_NAME_LEN 32
129 * block device image metadata (in-memory version)
131 struct rbd_image_header {
137 struct rw_semaphore snap_rwsem;
138 struct ceph_snap_context *snapc;
139 size_t snap_names_len;
148 * an instance of the client. multiple devices may share a client.
151 struct ceph_client *client;
153 struct list_head node;
157 * a single io request
160 struct request *rq; /* blk layer request */
161 struct bio *bio; /* cloned bio */
162 struct page **pages; /* list of used pages */
170 int id; /* blkdev unique id */
172 int major; /* blkdev assigned major */
173 struct gendisk *disk; /* blkdev's gendisk and rq */
174 struct request_queue *q;
176 struct ceph_client *client;
177 struct rbd_client *rbd_client;
179 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
181 spinlock_t lock; /* queue lock */
183 struct rbd_image_header header;
184 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
186 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
187 char pool_name[RBD_MAX_POOL_NAME_LEN];
190 char snap_name[RBD_MAX_SNAP_NAME_LEN];
191 u32 cur_snap; /* index+1 of current snapshot within snap context
195 struct list_head node;
198 static spinlock_t node_lock; /* protects client get/put */
200 static struct class *class_rbd; /* /sys/class/rbd */
201 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
202 static LIST_HEAD(rbd_dev_list); /* devices */
203 static LIST_HEAD(rbd_client_list); /* clients */
206 static int rbd_open(struct block_device *bdev, fmode_t mode)
208 struct gendisk *disk = bdev->bd_disk;
209 struct rbd_device *rbd_dev = disk->private_data;
211 set_device_ro(bdev, rbd_dev->read_only);
213 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
219 static const struct block_device_operations rbd_bd_ops = {
220 .owner = THIS_MODULE,
225 * Initialize an rbd client instance.
228 static struct rbd_client *rbd_client_create(struct ceph_options *opt)
230 struct rbd_client *rbdc;
233 dout("rbd_client_create\n");
234 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
238 kref_init(&rbdc->kref);
239 INIT_LIST_HEAD(&rbdc->node);
241 rbdc->client = ceph_create_client(opt, rbdc);
242 if (IS_ERR(rbdc->client))
245 ret = ceph_open_session(rbdc->client);
249 spin_lock(&node_lock);
250 list_add_tail(&rbdc->node, &rbd_client_list);
251 spin_unlock(&node_lock);
253 dout("rbd_client_create created %p\n", rbdc);
257 ceph_destroy_client(rbdc->client);
263 ceph_destroy_options(opt);
264 return ERR_PTR(-ENOMEM);
268 * Find a ceph client with specific addr and configuration.
270 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
272 struct rbd_client *client_node;
274 if (opt->flags & CEPH_OPT_NOSHARE)
277 list_for_each_entry(client_node, &rbd_client_list, node)
278 if (ceph_compare_options(opt, client_node->client) == 0)
284 * Get a ceph client with specific addr and configuration, if one does
285 * not exist create it.
287 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
290 struct rbd_client *rbdc;
291 struct ceph_options *opt;
294 ret = ceph_parse_options(&opt, options, mon_addr,
295 mon_addr + strlen(mon_addr), NULL, NULL);
299 spin_lock(&node_lock);
300 rbdc = __rbd_client_find(opt);
302 ceph_destroy_options(opt);
304 /* using an existing client */
305 kref_get(&rbdc->kref);
306 rbd_dev->rbd_client = rbdc;
307 rbd_dev->client = rbdc->client;
308 spin_unlock(&node_lock);
311 spin_unlock(&node_lock);
313 rbdc = rbd_client_create(opt);
315 return PTR_ERR(rbdc);
317 rbd_dev->rbd_client = rbdc;
318 rbd_dev->client = rbdc->client;
323 * Destroy ceph client
325 static void rbd_client_release(struct kref *kref)
327 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
329 dout("rbd_release_client %p\n", rbdc);
330 spin_lock(&node_lock);
331 list_del(&rbdc->node);
332 spin_unlock(&node_lock);
334 ceph_destroy_client(rbdc->client);
339 * Drop reference to ceph client node. If it's not referenced anymore, release
342 static void rbd_put_client(struct rbd_device *rbd_dev)
344 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
345 rbd_dev->rbd_client = NULL;
346 rbd_dev->client = NULL;
351 * Create a new header structure, translate header format from the on-disk
354 static int rbd_header_from_disk(struct rbd_image_header *header,
355 struct rbd_image_header_ondisk *ondisk,
360 u32 snap_count = le32_to_cpu(ondisk->snap_count);
363 init_rwsem(&header->snap_rwsem);
365 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
366 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
368 sizeof(struct rbd_image_snap_ondisk),
373 header->snap_names = kmalloc(header->snap_names_len,
375 if (!header->snap_names)
377 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
379 if (!header->snap_sizes)
382 header->snap_names = NULL;
383 header->snap_sizes = NULL;
385 memcpy(header->block_name, ondisk->block_name,
386 sizeof(ondisk->block_name));
388 header->image_size = le64_to_cpu(ondisk->image_size);
389 header->obj_order = ondisk->options.order;
390 header->crypt_type = ondisk->options.crypt_type;
391 header->comp_type = ondisk->options.comp_type;
393 atomic_set(&header->snapc->nref, 1);
394 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
395 header->snapc->num_snaps = snap_count;
396 header->total_snaps = snap_count;
399 allocated_snaps == snap_count) {
400 for (i = 0; i < snap_count; i++) {
401 header->snapc->snaps[i] =
402 le64_to_cpu(ondisk->snaps[i].id);
403 header->snap_sizes[i] =
404 le64_to_cpu(ondisk->snaps[i].image_size);
407 /* copy snapshot names */
408 memcpy(header->snap_names, &ondisk->snaps[i],
409 header->snap_names_len);
415 kfree(header->snap_names);
417 kfree(header->snapc);
421 static int snap_index(struct rbd_image_header *header, int snap_num)
423 return header->total_snaps - snap_num;
426 static u64 cur_snap_id(struct rbd_device *rbd_dev)
428 struct rbd_image_header *header = &rbd_dev->header;
430 if (!rbd_dev->cur_snap)
433 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
436 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
440 char *p = header->snap_names;
442 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
443 if (strcmp(snap_name, p) == 0)
446 if (i == header->total_snaps)
449 *seq = header->snapc->snaps[i];
452 *size = header->snap_sizes[i];
457 static int rbd_header_set_snap(struct rbd_device *dev,
458 const char *snap_name,
461 struct rbd_image_header *header = &dev->header;
462 struct ceph_snap_context *snapc = header->snapc;
465 down_write(&header->snap_rwsem);
469 strcmp(snap_name, "-") == 0 ||
470 strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
471 if (header->total_snaps)
472 snapc->seq = header->snap_seq;
478 *size = header->image_size;
480 ret = snap_by_name(header, snap_name, &snapc->seq, size);
484 dev->cur_snap = header->total_snaps - ret;
490 up_write(&header->snap_rwsem);
494 static void rbd_header_free(struct rbd_image_header *header)
496 kfree(header->snapc);
497 kfree(header->snap_names);
498 kfree(header->snap_sizes);
502 * get the actual striped segment name, offset and length
504 static u64 rbd_get_segment(struct rbd_image_header *header,
505 const char *block_name,
507 char *seg_name, u64 *segofs)
509 u64 seg = ofs >> header->obj_order;
512 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
513 "%s.%012llx", block_name, seg);
515 ofs = ofs & ((1 << header->obj_order) - 1);
516 len = min_t(u64, len, (1 << header->obj_order) - ofs);
528 static void bio_chain_put(struct bio *chain)
534 chain = chain->bi_next;
540 * zeros a bio chain, starting at specific offset
542 static void zero_bio_chain(struct bio *chain, int start_ofs)
551 bio_for_each_segment(bv, chain, i) {
552 if (pos + bv->bv_len > start_ofs) {
553 int remainder = max(start_ofs - pos, 0);
554 buf = bvec_kmap_irq(bv, &flags);
555 memset(buf + remainder, 0,
556 bv->bv_len - remainder);
557 bvec_kunmap_irq(bv, &flags);
562 chain = chain->bi_next;
567 * bio_chain_clone - clone a chain of bios up to a certain length.
568 * might return a bio_pair that will need to be released.
570 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
571 struct bio_pair **bp,
572 int len, gfp_t gfpmask)
574 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
578 bio_pair_release(*bp);
582 while (old_chain && (total < len)) {
583 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
587 if (total + old_chain->bi_size > len) {
591 * this split can only happen with a single paged bio,
592 * split_bio will BUG_ON if this is not the case
594 dout("bio_chain_clone split! total=%d remaining=%d"
596 (int)total, (int)len-total,
597 (int)old_chain->bi_size);
599 /* split the bio. We'll release it either in the next
600 call, or it will have to be released outside */
601 bp = bio_split(old_chain, (len - total) / 512ULL);
605 __bio_clone(tmp, &bp->bio1);
609 __bio_clone(tmp, old_chain);
610 *next = old_chain->bi_next;
614 gfpmask &= ~__GFP_WAIT;
618 new_chain = tail = tmp;
623 old_chain = old_chain->bi_next;
625 total += tmp->bi_size;
631 tail->bi_next = NULL;
638 dout("bio_chain_clone with err\n");
639 bio_chain_put(new_chain);
644 * helpers for osd request op vectors.
646 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
651 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
655 (*ops)[0].op = opcode;
657 * op extent offset and length will be set later on
658 * in calc_raw_layout()
660 (*ops)[0].payload_len = payload_len;
664 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
670 * Send ceph osd request
672 static int rbd_do_request(struct request *rq,
673 struct rbd_device *dev,
674 struct ceph_snap_context *snapc,
676 const char *obj, u64 ofs, u64 len,
681 struct ceph_osd_req_op *ops,
683 void (*rbd_cb)(struct ceph_osd_request *req,
684 struct ceph_msg *msg))
686 struct ceph_osd_request *req;
687 struct ceph_file_layout *layout;
690 struct timespec mtime = CURRENT_TIME;
691 struct rbd_request *req_data;
692 struct ceph_osd_request_head *reqhead;
693 struct rbd_image_header *header = &dev->header;
696 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
700 dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
702 down_read(&header->snap_rwsem);
704 req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
708 GFP_NOIO, pages, bio);
710 up_read(&header->snap_rwsem);
715 req->r_callback = rbd_cb;
719 req_data->pages = pages;
722 req->r_priv = req_data;
724 reqhead = req->r_request->front.iov_base;
725 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
727 strncpy(req->r_oid, obj, sizeof(req->r_oid));
728 req->r_oid_len = strlen(req->r_oid);
730 layout = &req->r_file_layout;
731 memset(layout, 0, sizeof(*layout));
732 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
733 layout->fl_stripe_count = cpu_to_le32(1);
734 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
735 layout->fl_pg_preferred = cpu_to_le32(-1);
736 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
737 ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
738 ofs, &len, &bno, req, ops);
740 ceph_osdc_build_request(req, ofs, &len,
744 req->r_oid, req->r_oid_len);
745 up_read(&header->snap_rwsem);
747 ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
752 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
753 ceph_osdc_put_request(req);
758 bio_chain_put(req_data->bio);
759 ceph_osdc_put_request(req);
764 blk_end_request(rq, ret, len);
769 * Ceph osd op callback
771 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
773 struct rbd_request *req_data = req->r_priv;
774 struct ceph_osd_reply_head *replyhead;
775 struct ceph_osd_op *op;
781 replyhead = msg->front.iov_base;
782 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
783 op = (void *)(replyhead + 1);
784 rc = le32_to_cpu(replyhead->result);
785 bytes = le64_to_cpu(op->extent.length);
786 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
788 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
790 if (rc == -ENOENT && read_op) {
791 zero_bio_chain(req_data->bio, 0);
793 } else if (rc == 0 && read_op && bytes < req_data->len) {
794 zero_bio_chain(req_data->bio, bytes);
795 bytes = req_data->len;
798 blk_end_request(req_data->rq, rc, bytes);
801 bio_chain_put(req_data->bio);
803 ceph_osdc_put_request(req);
808 * Do a synchronous ceph osd operation
810 static int rbd_req_sync_op(struct rbd_device *dev,
811 struct ceph_snap_context *snapc,
815 struct ceph_osd_req_op *orig_ops,
824 struct ceph_osd_req_op *ops = orig_ops;
827 num_pages = calc_pages_for(ofs , len);
828 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
833 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
834 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
838 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
839 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
845 ret = rbd_do_request(NULL, dev, snapc, snapid,
855 if ((flags & CEPH_OSD_FLAG_READ) && buf)
856 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
860 rbd_destroy_ops(ops);
862 ceph_release_page_vector(pages, num_pages);
867 * Do an asynchronous ceph osd operation
869 static int rbd_do_op(struct request *rq,
870 struct rbd_device *rbd_dev ,
871 struct ceph_snap_context *snapc,
873 int opcode, int flags, int num_reply,
881 struct ceph_osd_req_op *ops;
884 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
888 seg_len = rbd_get_segment(&rbd_dev->header,
889 rbd_dev->header.block_name,
895 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
897 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
901 /* we've taken care of segment sizes earlier when we
902 cloned the bios. We should never have a segment
903 truncated at this point */
904 BUG_ON(seg_len < len);
906 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
907 seg_name, seg_ofs, seg_len,
920 * Request async osd write
922 static int rbd_req_write(struct request *rq,
923 struct rbd_device *rbd_dev,
924 struct ceph_snap_context *snapc,
928 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
930 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
936 * Request async osd read
938 static int rbd_req_read(struct request *rq,
939 struct rbd_device *rbd_dev,
944 return rbd_do_op(rq, rbd_dev, NULL,
945 (snapid ? snapid : CEPH_NOSNAP),
953 * Request sync osd read
955 static int rbd_req_sync_read(struct rbd_device *dev,
956 struct ceph_snap_context *snapc,
962 return rbd_req_sync_op(dev, NULL,
963 (snapid ? snapid : CEPH_NOSNAP),
967 1, obj, ofs, len, buf);
971 * Request sync osd read
973 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
977 struct ceph_osd_req_op *ops;
978 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
982 ops[0].snap.snapid = snapid;
984 ret = rbd_req_sync_op(dev, NULL,
987 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
991 rbd_destroy_ops(ops);
1000 * Request sync osd read
1002 static int rbd_req_sync_exec(struct rbd_device *dev,
1009 struct ceph_osd_req_op *ops;
1010 int cls_len = strlen(cls);
1011 int method_len = strlen(method);
1012 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1013 cls_len + method_len + len);
1017 ops[0].cls.class_name = cls;
1018 ops[0].cls.class_len = (__u8)cls_len;
1019 ops[0].cls.method_name = method;
1020 ops[0].cls.method_len = (__u8)method_len;
1021 ops[0].cls.argc = 0;
1022 ops[0].cls.indata = data;
1023 ops[0].cls.indata_len = len;
1025 ret = rbd_req_sync_op(dev, NULL,
1028 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1030 1, obj, 0, 0, NULL);
1032 rbd_destroy_ops(ops);
1034 dout("cls_exec returned %d\n", ret);
1039 * block device queue callback
1041 static void rbd_rq_fn(struct request_queue *q)
1043 struct rbd_device *rbd_dev = q->queuedata;
1045 struct bio_pair *bp = NULL;
1047 rq = blk_fetch_request(q);
1051 struct bio *rq_bio, *next_bio = NULL;
1053 int size, op_size = 0;
1056 /* peek at request from block layer */
1060 dout("fetched request\n");
1062 /* filter out block requests we don't understand */
1063 if ((rq->cmd_type != REQ_TYPE_FS)) {
1064 __blk_end_request_all(rq, 0);
1068 /* deduce our operation (read, write) */
1069 do_write = (rq_data_dir(rq) == WRITE);
1071 size = blk_rq_bytes(rq);
1072 ofs = blk_rq_pos(rq) * 512ULL;
1074 if (do_write && rbd_dev->read_only) {
1075 __blk_end_request_all(rq, -EROFS);
1079 spin_unlock_irq(q->queue_lock);
1081 dout("%s 0x%x bytes at 0x%llx\n",
1082 do_write ? "write" : "read",
1083 size, blk_rq_pos(rq) * 512ULL);
1086 /* a bio clone to be passed down to OSD req */
1087 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1088 op_size = rbd_get_segment(&rbd_dev->header,
1089 rbd_dev->header.block_name,
1092 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1093 op_size, GFP_ATOMIC);
1095 spin_lock_irq(q->queue_lock);
1096 __blk_end_request_all(rq, -ENOMEM);
1100 /* init OSD command: write or read */
1102 rbd_req_write(rq, rbd_dev,
1103 rbd_dev->header.snapc,
1107 rbd_req_read(rq, rbd_dev,
1108 cur_snap_id(rbd_dev),
1119 bio_pair_release(bp);
1121 spin_lock_irq(q->queue_lock);
1123 rq = blk_fetch_request(q);
1128 * a queue callback. Makes sure that we don't create a bio that spans across
1129 * multiple osd objects. One exception would be with a single page bios,
1130 * which we handle later at bio_chain_clone
1132 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1133 struct bio_vec *bvec)
1135 struct rbd_device *rbd_dev = q->queuedata;
1136 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1137 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1138 unsigned int bio_sectors = bmd->bi_size >> 9;
1141 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1142 + bio_sectors)) << 9;
1144 max = 0; /* bio_add cannot handle a negative return */
1145 if (max <= bvec->bv_len && bio_sectors == 0)
1146 return bvec->bv_len;
1150 static void rbd_free_disk(struct rbd_device *rbd_dev)
1152 struct gendisk *disk = rbd_dev->disk;
1157 rbd_header_free(&rbd_dev->header);
1159 if (disk->flags & GENHD_FL_UP)
1162 blk_cleanup_queue(disk->queue);
1167 * reload the ondisk the header
1169 static int rbd_read_header(struct rbd_device *rbd_dev,
1170 struct rbd_image_header *header)
1173 struct rbd_image_header_ondisk *dh;
1175 u64 snap_names_len = 0;
1178 int len = sizeof(*dh) +
1179 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1183 dh = kmalloc(len, GFP_KERNEL);
1187 rc = rbd_req_sync_read(rbd_dev,
1189 rbd_dev->obj_md_name,
1195 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1199 if (snap_count != header->total_snaps) {
1200 snap_count = header->total_snaps;
1201 snap_names_len = header->snap_names_len;
1202 rbd_header_free(header);
1217 static int rbd_header_add_snap(struct rbd_device *dev,
1218 const char *snap_name,
1221 int name_len = strlen(snap_name);
1224 void *data, *data_start, *data_end;
1226 /* we should create a snapshot only if we're pointing at the head */
1230 ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1232 dout("created snapid=%lld\n", new_snapid);
1236 data = kmalloc(name_len + 16, gfp_flags);
1241 data_end = data + name_len + 16;
1243 ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1244 ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1246 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1247 data_start, data - data_start);
1254 dev->header.snapc->seq = new_snapid;
1262 * only read the first part of the ondisk header, without the snaps info
1264 static int rbd_update_snaps(struct rbd_device *rbd_dev)
1267 struct rbd_image_header h;
1270 ret = rbd_read_header(rbd_dev, &h);
1274 down_write(&rbd_dev->header.snap_rwsem);
1276 snap_seq = rbd_dev->header.snapc->seq;
1278 kfree(rbd_dev->header.snapc);
1279 kfree(rbd_dev->header.snap_names);
1280 kfree(rbd_dev->header.snap_sizes);
1282 rbd_dev->header.total_snaps = h.total_snaps;
1283 rbd_dev->header.snapc = h.snapc;
1284 rbd_dev->header.snap_names = h.snap_names;
1285 rbd_dev->header.snap_sizes = h.snap_sizes;
1286 rbd_dev->header.snapc->seq = snap_seq;
1288 up_write(&rbd_dev->header.snap_rwsem);
1293 static int rbd_init_disk(struct rbd_device *rbd_dev)
1295 struct gendisk *disk;
1296 struct request_queue *q;
1300 /* contact OSD, request size info about the object being mapped */
1301 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1305 rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1309 /* create gendisk info */
1311 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1315 sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1316 disk->major = rbd_dev->major;
1317 disk->first_minor = 0;
1318 disk->fops = &rbd_bd_ops;
1319 disk->private_data = rbd_dev;
1323 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1326 blk_queue_merge_bvec(q, rbd_merge_bvec);
1329 q->queuedata = rbd_dev;
1331 rbd_dev->disk = disk;
1334 /* finally, announce the disk to the world */
1335 set_capacity(disk, total_size / 512ULL);
1338 pr_info("%s: added with size 0x%llx\n",
1339 disk->disk_name, (unsigned long long)total_size);
1348 /********************************************************************
1350 * add map rados objects to blkdev
1351 * remove unmap rados objects
1352 * list show mappings
1353 *******************************************************************/
1355 static void class_rbd_release(struct class *cls)
1360 static ssize_t class_rbd_list(struct class *c,
1361 struct class_attribute *attr,
1365 struct list_head *tmp;
1366 int max = PAGE_SIZE;
1368 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1370 n += snprintf(data, max,
1371 "#id\tmajor\tclient_name\tpool\tname\tsnap\tKB\n");
1373 list_for_each(tmp, &rbd_dev_list) {
1374 struct rbd_device *rbd_dev;
1376 rbd_dev = list_entry(tmp, struct rbd_device, node);
1377 n += snprintf(data+n, max-n,
1378 "%d\t%d\tclient%lld\t%s\t%s\t%s\t%lld\n",
1381 ceph_client_id(rbd_dev->client),
1383 rbd_dev->obj, rbd_dev->snap_name,
1384 rbd_dev->header.image_size >> 10);
1389 mutex_unlock(&ctl_mutex);
1393 static ssize_t class_rbd_add(struct class *c,
1394 struct class_attribute *attr,
1395 const char *buf, size_t count)
1397 struct ceph_osd_client *osdc;
1398 struct rbd_device *rbd_dev;
1399 ssize_t rc = -ENOMEM;
1400 int irc, new_id = 0;
1401 struct list_head *tmp;
1405 if (!try_module_get(THIS_MODULE))
1408 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1412 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1416 /* new rbd_device object */
1417 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
1421 /* static rbd_device initialization */
1422 spin_lock_init(&rbd_dev->lock);
1423 INIT_LIST_HEAD(&rbd_dev->node);
1425 /* generate unique id: find highest unique id, add one */
1426 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1428 list_for_each(tmp, &rbd_dev_list) {
1429 struct rbd_device *rbd_dev;
1431 rbd_dev = list_entry(tmp, struct rbd_device, node);
1432 if (rbd_dev->id >= new_id)
1433 new_id = rbd_dev->id + 1;
1436 rbd_dev->id = new_id;
1438 /* add to global list */
1439 list_add_tail(&rbd_dev->node, &rbd_dev_list);
1441 /* parse add command */
1442 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
1443 "%" __stringify(RBD_MAX_OPT_LEN) "s "
1444 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
1445 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
1446 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1447 mon_dev_name, options, rbd_dev->pool_name,
1448 rbd_dev->obj, rbd_dev->snap_name) < 4) {
1453 if (rbd_dev->snap_name[0] == 0)
1454 rbd_dev->snap_name[0] = '-';
1456 rbd_dev->obj_len = strlen(rbd_dev->obj);
1457 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
1458 rbd_dev->obj, RBD_SUFFIX);
1460 /* initialize rest of new object */
1461 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
1462 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
1466 mutex_unlock(&ctl_mutex);
1469 osdc = &rbd_dev->client->osdc;
1470 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
1472 goto err_out_client;
1473 rbd_dev->poolid = rc;
1475 /* register our block device */
1476 irc = register_blkdev(0, rbd_dev->name);
1479 goto err_out_client;
1481 rbd_dev->major = irc;
1483 /* set up and announce blkdev mapping */
1484 rc = rbd_init_disk(rbd_dev);
1486 goto err_out_blkdev;
1491 unregister_blkdev(rbd_dev->major, rbd_dev->name);
1493 rbd_put_client(rbd_dev);
1494 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1496 list_del_init(&rbd_dev->node);
1497 mutex_unlock(&ctl_mutex);
1503 kfree(mon_dev_name);
1505 dout("Error adding device %s\n", buf);
1506 module_put(THIS_MODULE);
1510 static struct rbd_device *__rbd_get_dev(unsigned long id)
1512 struct list_head *tmp;
1513 struct rbd_device *rbd_dev;
1515 list_for_each(tmp, &rbd_dev_list) {
1516 rbd_dev = list_entry(tmp, struct rbd_device, node);
1517 if (rbd_dev->id == id)
1523 static ssize_t class_rbd_remove(struct class *c,
1524 struct class_attribute *attr,
1528 struct rbd_device *rbd_dev = NULL;
1532 rc = strict_strtoul(buf, 10, &ul);
1536 /* convert to int; abort if we lost anything in the conversion */
1537 target_id = (int) ul;
1538 if (target_id != ul)
1541 /* remove object from list immediately */
1542 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1544 rbd_dev = __rbd_get_dev(target_id);
1546 list_del_init(&rbd_dev->node);
1548 mutex_unlock(&ctl_mutex);
1553 rbd_put_client(rbd_dev);
1555 /* clean up and free blkdev */
1556 rbd_free_disk(rbd_dev);
1557 unregister_blkdev(rbd_dev->major, rbd_dev->name);
1560 /* release module ref */
1561 module_put(THIS_MODULE);
1566 static ssize_t class_rbd_snaps_list(struct class *c,
1567 struct class_attribute *attr,
1570 struct rbd_device *rbd_dev = NULL;
1571 struct list_head *tmp;
1572 struct rbd_image_header *header;
1573 int i, n = 0, max = PAGE_SIZE;
1576 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1578 n += snprintf(data, max, "#id\tsnap\tKB\n");
1580 list_for_each(tmp, &rbd_dev_list) {
1582 struct ceph_snap_context *snapc;
1584 rbd_dev = list_entry(tmp, struct rbd_device, node);
1585 header = &rbd_dev->header;
1587 down_read(&header->snap_rwsem);
1589 names = header->snap_names;
1590 snapc = header->snapc;
1592 n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
1593 rbd_dev->id, RBD_SNAP_HEAD_NAME,
1594 header->image_size >> 10,
1595 (!rbd_dev->cur_snap ? " (*)" : ""));
1600 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
1601 n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
1602 rbd_dev->id, p, header->snap_sizes[i] >> 10,
1603 (rbd_dev->cur_snap &&
1604 (snap_index(header, i) == rbd_dev->cur_snap) ?
1610 up_read(&header->snap_rwsem);
1615 mutex_unlock(&ctl_mutex);
1619 static ssize_t class_rbd_snaps_refresh(struct class *c,
1620 struct class_attribute *attr,
1624 struct rbd_device *rbd_dev = NULL;
1629 rc = strict_strtoul(buf, 10, &ul);
1633 /* convert to int; abort if we lost anything in the conversion */
1634 target_id = (int) ul;
1635 if (target_id != ul)
1638 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1640 rbd_dev = __rbd_get_dev(target_id);
1646 rc = rbd_update_snaps(rbd_dev);
1651 mutex_unlock(&ctl_mutex);
1655 static ssize_t class_rbd_snap_create(struct class *c,
1656 struct class_attribute *attr,
1660 struct rbd_device *rbd_dev = NULL;
1664 name = kmalloc(RBD_MAX_SNAP_NAME_LEN + 1, GFP_KERNEL);
1668 /* parse snaps add command */
1669 if (sscanf(buf, "%d "
1670 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1677 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1679 rbd_dev = __rbd_get_dev(target_id);
1685 ret = rbd_header_add_snap(rbd_dev,
1690 ret = rbd_update_snaps(rbd_dev);
1696 mutex_unlock(&ctl_mutex);
1702 static ssize_t class_rbd_rollback(struct class *c,
1703 struct class_attribute *attr,
1707 struct rbd_device *rbd_dev = NULL;
1710 char snap_name[RBD_MAX_SNAP_NAME_LEN];
1714 /* parse snaps add command */
1715 if (sscanf(buf, "%d "
1716 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1723 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1727 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1729 rbd_dev = __rbd_get_dev(target_id);
1735 ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
1739 dout("snapid=%lld\n", snapid);
1742 while (cur_ofs < rbd_dev->header.image_size) {
1743 cur_ofs += rbd_get_segment(&rbd_dev->header,
1747 dout("seg_name=%s\n", seg_name);
1749 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
1751 pr_warning("could not roll back obj %s err=%d\n",
1755 ret = rbd_update_snaps(rbd_dev);
1762 mutex_unlock(&ctl_mutex);
1768 static struct class_attribute class_rbd_attrs[] = {
1769 __ATTR(add, 0200, NULL, class_rbd_add),
1770 __ATTR(remove, 0200, NULL, class_rbd_remove),
1771 __ATTR(list, 0444, class_rbd_list, NULL),
1772 __ATTR(snaps_refresh, 0200, NULL, class_rbd_snaps_refresh),
1773 __ATTR(snap_create, 0200, NULL, class_rbd_snap_create),
1774 __ATTR(snaps_list, 0444, class_rbd_snaps_list, NULL),
1775 __ATTR(snap_rollback, 0200, NULL, class_rbd_rollback),
1780 * create control files in sysfs
1781 * /sys/class/rbd/...
1783 static int rbd_sysfs_init(void)
1787 class_rbd = kzalloc(sizeof(*class_rbd), GFP_KERNEL);
1791 class_rbd->name = DRV_NAME;
1792 class_rbd->owner = THIS_MODULE;
1793 class_rbd->class_release = class_rbd_release;
1794 class_rbd->class_attrs = class_rbd_attrs;
1796 ret = class_register(class_rbd);
1804 pr_err(DRV_NAME ": failed to create class rbd\n");
1809 static void rbd_sysfs_cleanup(void)
1812 class_destroy(class_rbd);
1816 int __init rbd_init(void)
1820 rc = rbd_sysfs_init();
1823 spin_lock_init(&node_lock);
1824 pr_info("loaded " DRV_NAME_LONG "\n");
1828 void __exit rbd_exit(void)
1830 rbd_sysfs_cleanup();
1833 module_init(rbd_init);
1834 module_exit(rbd_exit);
1836 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
1837 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
1838 MODULE_DESCRIPTION("rados block device");
1840 /* following authorship retained from original osdblk.c */
1841 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
1843 MODULE_LICENSE("GPL");