28670c0c68d52dec42501211357694affca4d501
[firefly-linux-kernel-4.4.55.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         size_t snap_names_len;
85         u32 total_snaps;
86
87         char *snap_names;
88         u64 *snap_sizes;
89
90         u64 obj_version;
91 };
92
93 struct rbd_options {
94         int     notify_timeout;
95 };
96
97 /*
98  * an instance of the client.  multiple devices may share an rbd client.
99  */
100 struct rbd_client {
101         struct ceph_client      *client;
102         struct rbd_options      *rbd_opts;
103         struct kref             kref;
104         struct list_head        node;
105 };
106
107 /*
108  * a request completion status
109  */
110 struct rbd_req_status {
111         int done;
112         int rc;
113         u64 bytes;
114 };
115
116 /*
117  * a collection of requests
118  */
119 struct rbd_req_coll {
120         int                     total;
121         int                     num_done;
122         struct kref             kref;
123         struct rbd_req_status   status[0];
124 };
125
126 /*
127  * a single io request
128  */
129 struct rbd_request {
130         struct request          *rq;            /* blk layer request */
131         struct bio              *bio;           /* cloned bio */
132         struct page             **pages;        /* list of used pages */
133         u64                     len;
134         int                     coll_index;
135         struct rbd_req_coll     *coll;
136 };
137
138 struct rbd_snap {
139         struct  device          dev;
140         const char              *name;
141         u64                     size;
142         struct list_head        node;
143         u64                     id;
144 };
145
146 /*
147  * a single device
148  */
149 struct rbd_device {
150         int                     dev_id;         /* blkdev unique id */
151
152         int                     major;          /* blkdev assigned major */
153         struct gendisk          *disk;          /* blkdev's gendisk and rq */
154         struct request_queue    *q;
155
156         struct rbd_client       *rbd_client;
157
158         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160         spinlock_t              lock;           /* queue lock */
161
162         struct rbd_image_header header;
163         char                    *image_name;
164         size_t                  image_name_len;
165         char                    *header_name;
166         char                    *pool_name;
167         int                     pool_id;
168
169         struct ceph_osd_event   *watch_event;
170         struct ceph_osd_request *watch_request;
171
172         /* protects updating the header */
173         struct rw_semaphore     header_rwsem;
174         /* name of the snapshot this device reads from */
175         char                    *snap_name;
176         /* id of the snapshot this device reads from */
177         u64                     snap_id;        /* current snapshot id */
178         /* whether the snap_id this device reads from still exists */
179         bool                    snap_exists;
180         int                     read_only;
181
182         struct list_head        node;
183
184         /* list of snapshots */
185         struct list_head        snaps;
186
187         /* sysfs related */
188         struct device           dev;
189 };
190
191 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
192
193 static LIST_HEAD(rbd_dev_list);    /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196 static LIST_HEAD(rbd_client_list);              /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202                             struct device_attribute *attr,
203                             const char *buf,
204                             size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208                        size_t count);
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210                           size_t count);
211
212 static struct bus_attribute rbd_bus_attrs[] = {
213         __ATTR(add, S_IWUSR, NULL, rbd_add),
214         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
215         __ATTR_NULL
216 };
217
218 static struct bus_type rbd_bus_type = {
219         .name           = "rbd",
220         .bus_attrs      = rbd_bus_attrs,
221 };
222
223 static void rbd_root_dev_release(struct device *dev)
224 {
225 }
226
227 static struct device rbd_root_dev = {
228         .init_name =    "rbd",
229         .release =      rbd_root_dev_release,
230 };
231
232
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 {
235         return get_device(&rbd_dev->dev);
236 }
237
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 {
240         put_device(&rbd_dev->dev);
241 }
242
243 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
244
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 {
247         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248
249         rbd_get_dev(rbd_dev);
250
251         set_device_ro(bdev, rbd_dev->read_only);
252
253         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
254                 return -EROFS;
255
256         return 0;
257 }
258
259 static int rbd_release(struct gendisk *disk, fmode_t mode)
260 {
261         struct rbd_device *rbd_dev = disk->private_data;
262
263         rbd_put_dev(rbd_dev);
264
265         return 0;
266 }
267
268 static const struct block_device_operations rbd_bd_ops = {
269         .owner                  = THIS_MODULE,
270         .open                   = rbd_open,
271         .release                = rbd_release,
272 };
273
274 /*
275  * Initialize an rbd client instance.
276  * We own *ceph_opts.
277  */
278 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
279                                             struct rbd_options *rbd_opts)
280 {
281         struct rbd_client *rbdc;
282         int ret = -ENOMEM;
283
284         dout("rbd_client_create\n");
285         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
286         if (!rbdc)
287                 goto out_opt;
288
289         kref_init(&rbdc->kref);
290         INIT_LIST_HEAD(&rbdc->node);
291
292         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
293
294         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
295         if (IS_ERR(rbdc->client))
296                 goto out_mutex;
297         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
298
299         ret = ceph_open_session(rbdc->client);
300         if (ret < 0)
301                 goto out_err;
302
303         rbdc->rbd_opts = rbd_opts;
304
305         spin_lock(&rbd_client_list_lock);
306         list_add_tail(&rbdc->node, &rbd_client_list);
307         spin_unlock(&rbd_client_list_lock);
308
309         mutex_unlock(&ctl_mutex);
310
311         dout("rbd_client_create created %p\n", rbdc);
312         return rbdc;
313
314 out_err:
315         ceph_destroy_client(rbdc->client);
316 out_mutex:
317         mutex_unlock(&ctl_mutex);
318         kfree(rbdc);
319 out_opt:
320         if (ceph_opts)
321                 ceph_destroy_options(ceph_opts);
322         return ERR_PTR(ret);
323 }
324
325 /*
326  * Find a ceph client with specific addr and configuration.
327  */
328 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
329 {
330         struct rbd_client *client_node;
331
332         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
333                 return NULL;
334
335         list_for_each_entry(client_node, &rbd_client_list, node)
336                 if (!ceph_compare_options(ceph_opts, client_node->client))
337                         return client_node;
338         return NULL;
339 }
340
341 /*
342  * mount options
343  */
344 enum {
345         Opt_notify_timeout,
346         Opt_last_int,
347         /* int args above */
348         Opt_last_string,
349         /* string args above */
350 };
351
352 static match_table_t rbd_opts_tokens = {
353         {Opt_notify_timeout, "notify_timeout=%d"},
354         /* int args above */
355         /* string args above */
356         {-1, NULL}
357 };
358
359 static int parse_rbd_opts_token(char *c, void *private)
360 {
361         struct rbd_options *rbd_opts = private;
362         substring_t argstr[MAX_OPT_ARGS];
363         int token, intval, ret;
364
365         token = match_token(c, rbd_opts_tokens, argstr);
366         if (token < 0)
367                 return -EINVAL;
368
369         if (token < Opt_last_int) {
370                 ret = match_int(&argstr[0], &intval);
371                 if (ret < 0) {
372                         pr_err("bad mount option arg (not int) "
373                                "at '%s'\n", c);
374                         return ret;
375                 }
376                 dout("got int token %d val %d\n", token, intval);
377         } else if (token > Opt_last_int && token < Opt_last_string) {
378                 dout("got string token %d val %s\n", token,
379                      argstr[0].from);
380         } else {
381                 dout("got token %d\n", token);
382         }
383
384         switch (token) {
385         case Opt_notify_timeout:
386                 rbd_opts->notify_timeout = intval;
387                 break;
388         default:
389                 BUG_ON(token);
390         }
391         return 0;
392 }
393
394 /*
395  * Get a ceph client with specific addr and configuration, if one does
396  * not exist create it.
397  */
398 static struct rbd_client *rbd_get_client(const char *mon_addr,
399                                          size_t mon_addr_len,
400                                          char *options)
401 {
402         struct rbd_client *rbdc;
403         struct ceph_options *ceph_opts;
404         struct rbd_options *rbd_opts;
405
406         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
407         if (!rbd_opts)
408                 return ERR_PTR(-ENOMEM);
409
410         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
411
412         ceph_opts = ceph_parse_options(options, mon_addr,
413                                         mon_addr + mon_addr_len,
414                                         parse_rbd_opts_token, rbd_opts);
415         if (IS_ERR(ceph_opts)) {
416                 kfree(rbd_opts);
417                 return ERR_CAST(ceph_opts);
418         }
419
420         spin_lock(&rbd_client_list_lock);
421         rbdc = __rbd_client_find(ceph_opts);
422         if (rbdc) {
423                 /* using an existing client */
424                 kref_get(&rbdc->kref);
425                 spin_unlock(&rbd_client_list_lock);
426
427                 ceph_destroy_options(ceph_opts);
428                 kfree(rbd_opts);
429
430                 return rbdc;
431         }
432         spin_unlock(&rbd_client_list_lock);
433
434         rbdc = rbd_client_create(ceph_opts, rbd_opts);
435
436         if (IS_ERR(rbdc))
437                 kfree(rbd_opts);
438
439         return rbdc;
440 }
441
442 /*
443  * Destroy ceph client
444  *
445  * Caller must hold rbd_client_list_lock.
446  */
447 static void rbd_client_release(struct kref *kref)
448 {
449         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
450
451         dout("rbd_release_client %p\n", rbdc);
452         spin_lock(&rbd_client_list_lock);
453         list_del(&rbdc->node);
454         spin_unlock(&rbd_client_list_lock);
455
456         ceph_destroy_client(rbdc->client);
457         kfree(rbdc->rbd_opts);
458         kfree(rbdc);
459 }
460
461 /*
462  * Drop reference to ceph client node. If it's not referenced anymore, release
463  * it.
464  */
465 static void rbd_put_client(struct rbd_device *rbd_dev)
466 {
467         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
468         rbd_dev->rbd_client = NULL;
469 }
470
471 /*
472  * Destroy requests collection
473  */
474 static void rbd_coll_release(struct kref *kref)
475 {
476         struct rbd_req_coll *coll =
477                 container_of(kref, struct rbd_req_coll, kref);
478
479         dout("rbd_coll_release %p\n", coll);
480         kfree(coll);
481 }
482
483 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
484 {
485         return !memcmp(&ondisk->text,
486                         RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
487 }
488
489 /*
490  * Create a new header structure, translate header format from the on-disk
491  * header.
492  */
493 static int rbd_header_from_disk(struct rbd_image_header *header,
494                                  struct rbd_image_header_ondisk *ondisk,
495                                  u32 allocated_snaps)
496 {
497         u32 i, snap_count;
498
499         if (!rbd_dev_ondisk_valid(ondisk))
500                 return -ENXIO;
501
502         snap_count = le32_to_cpu(ondisk->snap_count);
503         if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
504                          / sizeof (*ondisk))
505                 return -EINVAL;
506         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
507                                 snap_count * sizeof(u64),
508                                 GFP_KERNEL);
509         if (!header->snapc)
510                 return -ENOMEM;
511
512         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
513         if (snap_count) {
514                 header->snap_names = kmalloc(header->snap_names_len,
515                                              GFP_KERNEL);
516                 if (!header->snap_names)
517                         goto err_snapc;
518                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
519                                              GFP_KERNEL);
520                 if (!header->snap_sizes)
521                         goto err_names;
522         } else {
523                 header->snap_names = NULL;
524                 header->snap_sizes = NULL;
525         }
526
527         header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
528                                         GFP_KERNEL);
529         if (!header->object_prefix)
530                 goto err_sizes;
531
532         memcpy(header->object_prefix, ondisk->block_name,
533                sizeof(ondisk->block_name));
534         header->object_prefix[sizeof (ondisk->block_name)] = '\0';
535
536         header->image_size = le64_to_cpu(ondisk->image_size);
537         header->obj_order = ondisk->options.order;
538         header->crypt_type = ondisk->options.crypt_type;
539         header->comp_type = ondisk->options.comp_type;
540
541         atomic_set(&header->snapc->nref, 1);
542         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
543         header->snapc->num_snaps = snap_count;
544         header->total_snaps = snap_count;
545
546         if (snap_count && allocated_snaps == snap_count) {
547                 for (i = 0; i < snap_count; i++) {
548                         header->snapc->snaps[i] =
549                                 le64_to_cpu(ondisk->snaps[i].id);
550                         header->snap_sizes[i] =
551                                 le64_to_cpu(ondisk->snaps[i].image_size);
552                 }
553
554                 /* copy snapshot names */
555                 memcpy(header->snap_names, &ondisk->snaps[i],
556                         header->snap_names_len);
557         }
558
559         return 0;
560
561 err_sizes:
562         kfree(header->snap_sizes);
563 err_names:
564         kfree(header->snap_names);
565 err_snapc:
566         kfree(header->snapc);
567         return -ENOMEM;
568 }
569
570 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
571                         u64 *seq, u64 *size)
572 {
573         int i;
574         char *p = header->snap_names;
575
576         for (i = 0; i < header->total_snaps; i++) {
577                 if (!strcmp(snap_name, p)) {
578
579                         /* Found it.  Pass back its id and/or size */
580
581                         if (seq)
582                                 *seq = header->snapc->snaps[i];
583                         if (size)
584                                 *size = header->snap_sizes[i];
585                         return i;
586                 }
587                 p += strlen(p) + 1;     /* Skip ahead to the next name */
588         }
589         return -ENOENT;
590 }
591
592 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
593 {
594         int ret;
595
596         down_write(&rbd_dev->header_rwsem);
597
598         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
599                     sizeof (RBD_SNAP_HEAD_NAME))) {
600                 rbd_dev->snap_id = CEPH_NOSNAP;
601                 rbd_dev->snap_exists = false;
602                 rbd_dev->read_only = 0;
603                 if (size)
604                         *size = rbd_dev->header.image_size;
605         } else {
606                 u64 snap_id = 0;
607
608                 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
609                                         &snap_id, size);
610                 if (ret < 0)
611                         goto done;
612                 rbd_dev->snap_id = snap_id;
613                 rbd_dev->snap_exists = true;
614                 rbd_dev->read_only = 1;
615         }
616
617         ret = 0;
618 done:
619         up_write(&rbd_dev->header_rwsem);
620         return ret;
621 }
622
623 static void rbd_header_free(struct rbd_image_header *header)
624 {
625         kfree(header->object_prefix);
626         kfree(header->snap_sizes);
627         kfree(header->snap_names);
628         ceph_put_snap_context(header->snapc);
629 }
630
631 /*
632  * get the actual striped segment name, offset and length
633  */
634 static u64 rbd_get_segment(struct rbd_image_header *header,
635                            const char *object_prefix,
636                            u64 ofs, u64 len,
637                            char *seg_name, u64 *segofs)
638 {
639         u64 seg = ofs >> header->obj_order;
640
641         if (seg_name)
642                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
643                          "%s.%012llx", object_prefix, seg);
644
645         ofs = ofs & ((1 << header->obj_order) - 1);
646         len = min_t(u64, len, (1 << header->obj_order) - ofs);
647
648         if (segofs)
649                 *segofs = ofs;
650
651         return len;
652 }
653
654 static int rbd_get_num_segments(struct rbd_image_header *header,
655                                 u64 ofs, u64 len)
656 {
657         u64 start_seg = ofs >> header->obj_order;
658         u64 end_seg = (ofs + len - 1) >> header->obj_order;
659         return end_seg - start_seg + 1;
660 }
661
662 /*
663  * returns the size of an object in the image
664  */
665 static u64 rbd_obj_bytes(struct rbd_image_header *header)
666 {
667         return 1 << header->obj_order;
668 }
669
670 /*
671  * bio helpers
672  */
673
674 static void bio_chain_put(struct bio *chain)
675 {
676         struct bio *tmp;
677
678         while (chain) {
679                 tmp = chain;
680                 chain = chain->bi_next;
681                 bio_put(tmp);
682         }
683 }
684
685 /*
686  * zeros a bio chain, starting at specific offset
687  */
688 static void zero_bio_chain(struct bio *chain, int start_ofs)
689 {
690         struct bio_vec *bv;
691         unsigned long flags;
692         void *buf;
693         int i;
694         int pos = 0;
695
696         while (chain) {
697                 bio_for_each_segment(bv, chain, i) {
698                         if (pos + bv->bv_len > start_ofs) {
699                                 int remainder = max(start_ofs - pos, 0);
700                                 buf = bvec_kmap_irq(bv, &flags);
701                                 memset(buf + remainder, 0,
702                                        bv->bv_len - remainder);
703                                 bvec_kunmap_irq(buf, &flags);
704                         }
705                         pos += bv->bv_len;
706                 }
707
708                 chain = chain->bi_next;
709         }
710 }
711
712 /*
713  * bio_chain_clone - clone a chain of bios up to a certain length.
714  * might return a bio_pair that will need to be released.
715  */
716 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
717                                    struct bio_pair **bp,
718                                    int len, gfp_t gfpmask)
719 {
720         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
721         int total = 0;
722
723         if (*bp) {
724                 bio_pair_release(*bp);
725                 *bp = NULL;
726         }
727
728         while (old_chain && (total < len)) {
729                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
730                 if (!tmp)
731                         goto err_out;
732
733                 if (total + old_chain->bi_size > len) {
734                         struct bio_pair *bp;
735
736                         /*
737                          * this split can only happen with a single paged bio,
738                          * split_bio will BUG_ON if this is not the case
739                          */
740                         dout("bio_chain_clone split! total=%d remaining=%d"
741                              "bi_size=%u\n",
742                              total, len - total, old_chain->bi_size);
743
744                         /* split the bio. We'll release it either in the next
745                            call, or it will have to be released outside */
746                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
747                         if (!bp)
748                                 goto err_out;
749
750                         __bio_clone(tmp, &bp->bio1);
751
752                         *next = &bp->bio2;
753                 } else {
754                         __bio_clone(tmp, old_chain);
755                         *next = old_chain->bi_next;
756                 }
757
758                 tmp->bi_bdev = NULL;
759                 gfpmask &= ~__GFP_WAIT;
760                 tmp->bi_next = NULL;
761
762                 if (!new_chain) {
763                         new_chain = tail = tmp;
764                 } else {
765                         tail->bi_next = tmp;
766                         tail = tmp;
767                 }
768                 old_chain = old_chain->bi_next;
769
770                 total += tmp->bi_size;
771         }
772
773         BUG_ON(total < len);
774
775         if (tail)
776                 tail->bi_next = NULL;
777
778         *old = old_chain;
779
780         return new_chain;
781
782 err_out:
783         dout("bio_chain_clone with err\n");
784         bio_chain_put(new_chain);
785         return NULL;
786 }
787
788 /*
789  * helpers for osd request op vectors.
790  */
791 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
792                                         int opcode, u32 payload_len)
793 {
794         struct ceph_osd_req_op *ops;
795
796         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
797         if (!ops)
798                 return NULL;
799
800         ops[0].op = opcode;
801
802         /*
803          * op extent offset and length will be set later on
804          * in calc_raw_layout()
805          */
806         ops[0].payload_len = payload_len;
807
808         return ops;
809 }
810
811 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
812 {
813         kfree(ops);
814 }
815
816 static void rbd_coll_end_req_index(struct request *rq,
817                                    struct rbd_req_coll *coll,
818                                    int index,
819                                    int ret, u64 len)
820 {
821         struct request_queue *q;
822         int min, max, i;
823
824         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
825              coll, index, ret, (unsigned long long) len);
826
827         if (!rq)
828                 return;
829
830         if (!coll) {
831                 blk_end_request(rq, ret, len);
832                 return;
833         }
834
835         q = rq->q;
836
837         spin_lock_irq(q->queue_lock);
838         coll->status[index].done = 1;
839         coll->status[index].rc = ret;
840         coll->status[index].bytes = len;
841         max = min = coll->num_done;
842         while (max < coll->total && coll->status[max].done)
843                 max++;
844
845         for (i = min; i<max; i++) {
846                 __blk_end_request(rq, coll->status[i].rc,
847                                   coll->status[i].bytes);
848                 coll->num_done++;
849                 kref_put(&coll->kref, rbd_coll_release);
850         }
851         spin_unlock_irq(q->queue_lock);
852 }
853
854 static void rbd_coll_end_req(struct rbd_request *req,
855                              int ret, u64 len)
856 {
857         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
858 }
859
860 /*
861  * Send ceph osd request
862  */
863 static int rbd_do_request(struct request *rq,
864                           struct rbd_device *rbd_dev,
865                           struct ceph_snap_context *snapc,
866                           u64 snapid,
867                           const char *object_name, u64 ofs, u64 len,
868                           struct bio *bio,
869                           struct page **pages,
870                           int num_pages,
871                           int flags,
872                           struct ceph_osd_req_op *ops,
873                           struct rbd_req_coll *coll,
874                           int coll_index,
875                           void (*rbd_cb)(struct ceph_osd_request *req,
876                                          struct ceph_msg *msg),
877                           struct ceph_osd_request **linger_req,
878                           u64 *ver)
879 {
880         struct ceph_osd_request *req;
881         struct ceph_file_layout *layout;
882         int ret;
883         u64 bno;
884         struct timespec mtime = CURRENT_TIME;
885         struct rbd_request *req_data;
886         struct ceph_osd_request_head *reqhead;
887         struct ceph_osd_client *osdc;
888
889         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
890         if (!req_data) {
891                 if (coll)
892                         rbd_coll_end_req_index(rq, coll, coll_index,
893                                                -ENOMEM, len);
894                 return -ENOMEM;
895         }
896
897         if (coll) {
898                 req_data->coll = coll;
899                 req_data->coll_index = coll_index;
900         }
901
902         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
903                 (unsigned long long) ofs, (unsigned long long) len);
904
905         osdc = &rbd_dev->rbd_client->client->osdc;
906         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
907                                         false, GFP_NOIO, pages, bio);
908         if (!req) {
909                 ret = -ENOMEM;
910                 goto done_pages;
911         }
912
913         req->r_callback = rbd_cb;
914
915         req_data->rq = rq;
916         req_data->bio = bio;
917         req_data->pages = pages;
918         req_data->len = len;
919
920         req->r_priv = req_data;
921
922         reqhead = req->r_request->front.iov_base;
923         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
924
925         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
926         req->r_oid_len = strlen(req->r_oid);
927
928         layout = &req->r_file_layout;
929         memset(layout, 0, sizeof(*layout));
930         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
931         layout->fl_stripe_count = cpu_to_le32(1);
932         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
933         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
934         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
935                                 req, ops);
936
937         ceph_osdc_build_request(req, ofs, &len,
938                                 ops,
939                                 snapc,
940                                 &mtime,
941                                 req->r_oid, req->r_oid_len);
942
943         if (linger_req) {
944                 ceph_osdc_set_request_linger(osdc, req);
945                 *linger_req = req;
946         }
947
948         ret = ceph_osdc_start_request(osdc, req, false);
949         if (ret < 0)
950                 goto done_err;
951
952         if (!rbd_cb) {
953                 ret = ceph_osdc_wait_request(osdc, req);
954                 if (ver)
955                         *ver = le64_to_cpu(req->r_reassert_version.version);
956                 dout("reassert_ver=%llu\n",
957                         (unsigned long long)
958                                 le64_to_cpu(req->r_reassert_version.version));
959                 ceph_osdc_put_request(req);
960         }
961         return ret;
962
963 done_err:
964         bio_chain_put(req_data->bio);
965         ceph_osdc_put_request(req);
966 done_pages:
967         rbd_coll_end_req(req_data, ret, len);
968         kfree(req_data);
969         return ret;
970 }
971
972 /*
973  * Ceph osd op callback
974  */
975 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
976 {
977         struct rbd_request *req_data = req->r_priv;
978         struct ceph_osd_reply_head *replyhead;
979         struct ceph_osd_op *op;
980         __s32 rc;
981         u64 bytes;
982         int read_op;
983
984         /* parse reply */
985         replyhead = msg->front.iov_base;
986         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
987         op = (void *)(replyhead + 1);
988         rc = le32_to_cpu(replyhead->result);
989         bytes = le64_to_cpu(op->extent.length);
990         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
991
992         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
993                 (unsigned long long) bytes, read_op, (int) rc);
994
995         if (rc == -ENOENT && read_op) {
996                 zero_bio_chain(req_data->bio, 0);
997                 rc = 0;
998         } else if (rc == 0 && read_op && bytes < req_data->len) {
999                 zero_bio_chain(req_data->bio, bytes);
1000                 bytes = req_data->len;
1001         }
1002
1003         rbd_coll_end_req(req_data, rc, bytes);
1004
1005         if (req_data->bio)
1006                 bio_chain_put(req_data->bio);
1007
1008         ceph_osdc_put_request(req);
1009         kfree(req_data);
1010 }
1011
1012 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1013 {
1014         ceph_osdc_put_request(req);
1015 }
1016
1017 /*
1018  * Do a synchronous ceph osd operation
1019  */
1020 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1021                            struct ceph_snap_context *snapc,
1022                            u64 snapid,
1023                            int opcode,
1024                            int flags,
1025                            struct ceph_osd_req_op *orig_ops,
1026                            const char *object_name,
1027                            u64 ofs, u64 len,
1028                            char *buf,
1029                            struct ceph_osd_request **linger_req,
1030                            u64 *ver)
1031 {
1032         int ret;
1033         struct page **pages;
1034         int num_pages;
1035         struct ceph_osd_req_op *ops = orig_ops;
1036         u32 payload_len;
1037
1038         num_pages = calc_pages_for(ofs , len);
1039         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1040         if (IS_ERR(pages))
1041                 return PTR_ERR(pages);
1042
1043         if (!orig_ops) {
1044                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1045                 ret = -ENOMEM;
1046                 ops = rbd_create_rw_ops(1, opcode, payload_len);
1047                 if (!ops)
1048                         goto done;
1049
1050                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1051                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1052                         if (ret < 0)
1053                                 goto done_ops;
1054                 }
1055         }
1056
1057         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1058                           object_name, ofs, len, NULL,
1059                           pages, num_pages,
1060                           flags,
1061                           ops,
1062                           NULL, 0,
1063                           NULL,
1064                           linger_req, ver);
1065         if (ret < 0)
1066                 goto done_ops;
1067
1068         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1069                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1070
1071 done_ops:
1072         if (!orig_ops)
1073                 rbd_destroy_ops(ops);
1074 done:
1075         ceph_release_page_vector(pages, num_pages);
1076         return ret;
1077 }
1078
1079 /*
1080  * Do an asynchronous ceph osd operation
1081  */
1082 static int rbd_do_op(struct request *rq,
1083                      struct rbd_device *rbd_dev,
1084                      struct ceph_snap_context *snapc,
1085                      u64 snapid,
1086                      int opcode, int flags,
1087                      u64 ofs, u64 len,
1088                      struct bio *bio,
1089                      struct rbd_req_coll *coll,
1090                      int coll_index)
1091 {
1092         char *seg_name;
1093         u64 seg_ofs;
1094         u64 seg_len;
1095         int ret;
1096         struct ceph_osd_req_op *ops;
1097         u32 payload_len;
1098
1099         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1100         if (!seg_name)
1101                 return -ENOMEM;
1102
1103         seg_len = rbd_get_segment(&rbd_dev->header,
1104                                   rbd_dev->header.object_prefix,
1105                                   ofs, len,
1106                                   seg_name, &seg_ofs);
1107
1108         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1109
1110         ret = -ENOMEM;
1111         ops = rbd_create_rw_ops(1, opcode, payload_len);
1112         if (!ops)
1113                 goto done;
1114
1115         /* we've taken care of segment sizes earlier when we
1116            cloned the bios. We should never have a segment
1117            truncated at this point */
1118         BUG_ON(seg_len < len);
1119
1120         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1121                              seg_name, seg_ofs, seg_len,
1122                              bio,
1123                              NULL, 0,
1124                              flags,
1125                              ops,
1126                              coll, coll_index,
1127                              rbd_req_cb, 0, NULL);
1128
1129         rbd_destroy_ops(ops);
1130 done:
1131         kfree(seg_name);
1132         return ret;
1133 }
1134
1135 /*
1136  * Request async osd write
1137  */
1138 static int rbd_req_write(struct request *rq,
1139                          struct rbd_device *rbd_dev,
1140                          struct ceph_snap_context *snapc,
1141                          u64 ofs, u64 len,
1142                          struct bio *bio,
1143                          struct rbd_req_coll *coll,
1144                          int coll_index)
1145 {
1146         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1147                          CEPH_OSD_OP_WRITE,
1148                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1149                          ofs, len, bio, coll, coll_index);
1150 }
1151
1152 /*
1153  * Request async osd read
1154  */
1155 static int rbd_req_read(struct request *rq,
1156                          struct rbd_device *rbd_dev,
1157                          u64 snapid,
1158                          u64 ofs, u64 len,
1159                          struct bio *bio,
1160                          struct rbd_req_coll *coll,
1161                          int coll_index)
1162 {
1163         return rbd_do_op(rq, rbd_dev, NULL,
1164                          snapid,
1165                          CEPH_OSD_OP_READ,
1166                          CEPH_OSD_FLAG_READ,
1167                          ofs, len, bio, coll, coll_index);
1168 }
1169
1170 /*
1171  * Request sync osd read
1172  */
1173 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1174                           u64 snapid,
1175                           const char *object_name,
1176                           u64 ofs, u64 len,
1177                           char *buf,
1178                           u64 *ver)
1179 {
1180         return rbd_req_sync_op(rbd_dev, NULL,
1181                                snapid,
1182                                CEPH_OSD_OP_READ,
1183                                CEPH_OSD_FLAG_READ,
1184                                NULL,
1185                                object_name, ofs, len, buf, NULL, ver);
1186 }
1187
1188 /*
1189  * Request sync osd watch
1190  */
1191 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1192                                    u64 ver,
1193                                    u64 notify_id)
1194 {
1195         struct ceph_osd_req_op *ops;
1196         int ret;
1197
1198         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1199         if (!ops)
1200                 return -ENOMEM;
1201
1202         ops[0].watch.ver = cpu_to_le64(ver);
1203         ops[0].watch.cookie = notify_id;
1204         ops[0].watch.flag = 0;
1205
1206         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1207                           rbd_dev->header_name, 0, 0, NULL,
1208                           NULL, 0,
1209                           CEPH_OSD_FLAG_READ,
1210                           ops,
1211                           NULL, 0,
1212                           rbd_simple_req_cb, 0, NULL);
1213
1214         rbd_destroy_ops(ops);
1215         return ret;
1216 }
1217
1218 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1219 {
1220         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1221         u64 hver;
1222         int rc;
1223
1224         if (!rbd_dev)
1225                 return;
1226
1227         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1228                 rbd_dev->header_name, (unsigned long long) notify_id,
1229                 (unsigned int) opcode);
1230         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1231         rc = __rbd_refresh_header(rbd_dev);
1232         hver = rbd_dev->header.obj_version;
1233         mutex_unlock(&ctl_mutex);
1234         if (rc)
1235                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1236                            " update snaps: %d\n", rbd_dev->major, rc);
1237
1238         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1239 }
1240
1241 /*
1242  * Request sync osd watch
1243  */
1244 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1245 {
1246         struct ceph_osd_req_op *ops;
1247         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1248         int ret;
1249
1250         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1251         if (!ops)
1252                 return -ENOMEM;
1253
1254         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1255                                      (void *)rbd_dev, &rbd_dev->watch_event);
1256         if (ret < 0)
1257                 goto fail;
1258
1259         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1260         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1261         ops[0].watch.flag = 1;
1262
1263         ret = rbd_req_sync_op(rbd_dev, NULL,
1264                               CEPH_NOSNAP,
1265                               0,
1266                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1267                               ops,
1268                               rbd_dev->header_name,
1269                               0, 0, NULL,
1270                               &rbd_dev->watch_request, NULL);
1271
1272         if (ret < 0)
1273                 goto fail_event;
1274
1275         rbd_destroy_ops(ops);
1276         return 0;
1277
1278 fail_event:
1279         ceph_osdc_cancel_event(rbd_dev->watch_event);
1280         rbd_dev->watch_event = NULL;
1281 fail:
1282         rbd_destroy_ops(ops);
1283         return ret;
1284 }
1285
1286 /*
1287  * Request sync osd unwatch
1288  */
1289 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1290 {
1291         struct ceph_osd_req_op *ops;
1292         int ret;
1293
1294         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1295         if (!ops)
1296                 return -ENOMEM;
1297
1298         ops[0].watch.ver = 0;
1299         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1300         ops[0].watch.flag = 0;
1301
1302         ret = rbd_req_sync_op(rbd_dev, NULL,
1303                               CEPH_NOSNAP,
1304                               0,
1305                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1306                               ops,
1307                               rbd_dev->header_name,
1308                               0, 0, NULL, NULL, NULL);
1309
1310
1311         rbd_destroy_ops(ops);
1312         ceph_osdc_cancel_event(rbd_dev->watch_event);
1313         rbd_dev->watch_event = NULL;
1314         return ret;
1315 }
1316
1317 struct rbd_notify_info {
1318         struct rbd_device *rbd_dev;
1319 };
1320
1321 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1322 {
1323         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1324         if (!rbd_dev)
1325                 return;
1326
1327         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1328                         rbd_dev->header_name, (unsigned long long) notify_id,
1329                         (unsigned int) opcode);
1330 }
1331
1332 /*
1333  * Request sync osd notify
1334  */
1335 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1336 {
1337         struct ceph_osd_req_op *ops;
1338         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1339         struct ceph_osd_event *event;
1340         struct rbd_notify_info info;
1341         int payload_len = sizeof(u32) + sizeof(u32);
1342         int ret;
1343
1344         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1345         if (!ops)
1346                 return -ENOMEM;
1347
1348         info.rbd_dev = rbd_dev;
1349
1350         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1351                                      (void *)&info, &event);
1352         if (ret < 0)
1353                 goto fail;
1354
1355         ops[0].watch.ver = 1;
1356         ops[0].watch.flag = 1;
1357         ops[0].watch.cookie = event->cookie;
1358         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1359         ops[0].watch.timeout = 12;
1360
1361         ret = rbd_req_sync_op(rbd_dev, NULL,
1362                                CEPH_NOSNAP,
1363                                0,
1364                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1365                                ops,
1366                                rbd_dev->header_name,
1367                                0, 0, NULL, NULL, NULL);
1368         if (ret < 0)
1369                 goto fail_event;
1370
1371         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1372         dout("ceph_osdc_wait_event returned %d\n", ret);
1373         rbd_destroy_ops(ops);
1374         return 0;
1375
1376 fail_event:
1377         ceph_osdc_cancel_event(event);
1378 fail:
1379         rbd_destroy_ops(ops);
1380         return ret;
1381 }
1382
1383 /*
1384  * Request sync osd read
1385  */
1386 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1387                              const char *object_name,
1388                              const char *class_name,
1389                              const char *method_name,
1390                              const char *data,
1391                              int len,
1392                              u64 *ver)
1393 {
1394         struct ceph_osd_req_op *ops;
1395         int class_name_len = strlen(class_name);
1396         int method_name_len = strlen(method_name);
1397         int ret;
1398
1399         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1400                                     class_name_len + method_name_len + len);
1401         if (!ops)
1402                 return -ENOMEM;
1403
1404         ops[0].cls.class_name = class_name;
1405         ops[0].cls.class_len = (__u8) class_name_len;
1406         ops[0].cls.method_name = method_name;
1407         ops[0].cls.method_len = (__u8) method_name_len;
1408         ops[0].cls.argc = 0;
1409         ops[0].cls.indata = data;
1410         ops[0].cls.indata_len = len;
1411
1412         ret = rbd_req_sync_op(rbd_dev, NULL,
1413                                CEPH_NOSNAP,
1414                                0,
1415                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1416                                ops,
1417                                object_name, 0, 0, NULL, NULL, ver);
1418
1419         rbd_destroy_ops(ops);
1420
1421         dout("cls_exec returned %d\n", ret);
1422         return ret;
1423 }
1424
1425 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1426 {
1427         struct rbd_req_coll *coll =
1428                         kzalloc(sizeof(struct rbd_req_coll) +
1429                                 sizeof(struct rbd_req_status) * num_reqs,
1430                                 GFP_ATOMIC);
1431
1432         if (!coll)
1433                 return NULL;
1434         coll->total = num_reqs;
1435         kref_init(&coll->kref);
1436         return coll;
1437 }
1438
1439 /*
1440  * block device queue callback
1441  */
1442 static void rbd_rq_fn(struct request_queue *q)
1443 {
1444         struct rbd_device *rbd_dev = q->queuedata;
1445         struct request *rq;
1446         struct bio_pair *bp = NULL;
1447
1448         while ((rq = blk_fetch_request(q))) {
1449                 struct bio *bio;
1450                 struct bio *rq_bio, *next_bio = NULL;
1451                 bool do_write;
1452                 unsigned int size;
1453                 u64 op_size = 0;
1454                 u64 ofs;
1455                 int num_segs, cur_seg = 0;
1456                 struct rbd_req_coll *coll;
1457                 struct ceph_snap_context *snapc;
1458
1459                 /* peek at request from block layer */
1460                 if (!rq)
1461                         break;
1462
1463                 dout("fetched request\n");
1464
1465                 /* filter out block requests we don't understand */
1466                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1467                         __blk_end_request_all(rq, 0);
1468                         continue;
1469                 }
1470
1471                 /* deduce our operation (read, write) */
1472                 do_write = (rq_data_dir(rq) == WRITE);
1473
1474                 size = blk_rq_bytes(rq);
1475                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1476                 rq_bio = rq->bio;
1477                 if (do_write && rbd_dev->read_only) {
1478                         __blk_end_request_all(rq, -EROFS);
1479                         continue;
1480                 }
1481
1482                 spin_unlock_irq(q->queue_lock);
1483
1484                 down_read(&rbd_dev->header_rwsem);
1485
1486                 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1487                         up_read(&rbd_dev->header_rwsem);
1488                         dout("request for non-existent snapshot");
1489                         spin_lock_irq(q->queue_lock);
1490                         __blk_end_request_all(rq, -ENXIO);
1491                         continue;
1492                 }
1493
1494                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1495
1496                 up_read(&rbd_dev->header_rwsem);
1497
1498                 dout("%s 0x%x bytes at 0x%llx\n",
1499                      do_write ? "write" : "read",
1500                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1501
1502                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1503                 coll = rbd_alloc_coll(num_segs);
1504                 if (!coll) {
1505                         spin_lock_irq(q->queue_lock);
1506                         __blk_end_request_all(rq, -ENOMEM);
1507                         ceph_put_snap_context(snapc);
1508                         continue;
1509                 }
1510
1511                 do {
1512                         /* a bio clone to be passed down to OSD req */
1513                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1514                         op_size = rbd_get_segment(&rbd_dev->header,
1515                                                   rbd_dev->header.object_prefix,
1516                                                   ofs, size,
1517                                                   NULL, NULL);
1518                         kref_get(&coll->kref);
1519                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1520                                               op_size, GFP_ATOMIC);
1521                         if (!bio) {
1522                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1523                                                        -ENOMEM, op_size);
1524                                 goto next_seg;
1525                         }
1526
1527
1528                         /* init OSD command: write or read */
1529                         if (do_write)
1530                                 rbd_req_write(rq, rbd_dev,
1531                                               snapc,
1532                                               ofs,
1533                                               op_size, bio,
1534                                               coll, cur_seg);
1535                         else
1536                                 rbd_req_read(rq, rbd_dev,
1537                                              rbd_dev->snap_id,
1538                                              ofs,
1539                                              op_size, bio,
1540                                              coll, cur_seg);
1541
1542 next_seg:
1543                         size -= op_size;
1544                         ofs += op_size;
1545
1546                         cur_seg++;
1547                         rq_bio = next_bio;
1548                 } while (size > 0);
1549                 kref_put(&coll->kref, rbd_coll_release);
1550
1551                 if (bp)
1552                         bio_pair_release(bp);
1553                 spin_lock_irq(q->queue_lock);
1554
1555                 ceph_put_snap_context(snapc);
1556         }
1557 }
1558
1559 /*
1560  * a queue callback. Makes sure that we don't create a bio that spans across
1561  * multiple osd objects. One exception would be with a single page bios,
1562  * which we handle later at bio_chain_clone
1563  */
1564 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1565                           struct bio_vec *bvec)
1566 {
1567         struct rbd_device *rbd_dev = q->queuedata;
1568         unsigned int chunk_sectors;
1569         sector_t sector;
1570         unsigned int bio_sectors;
1571         int max;
1572
1573         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1574         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1575         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1576
1577         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1578                                  + bio_sectors)) << SECTOR_SHIFT;
1579         if (max < 0)
1580                 max = 0; /* bio_add cannot handle a negative return */
1581         if (max <= bvec->bv_len && bio_sectors == 0)
1582                 return bvec->bv_len;
1583         return max;
1584 }
1585
1586 static void rbd_free_disk(struct rbd_device *rbd_dev)
1587 {
1588         struct gendisk *disk = rbd_dev->disk;
1589
1590         if (!disk)
1591                 return;
1592
1593         rbd_header_free(&rbd_dev->header);
1594
1595         if (disk->flags & GENHD_FL_UP)
1596                 del_gendisk(disk);
1597         if (disk->queue)
1598                 blk_cleanup_queue(disk->queue);
1599         put_disk(disk);
1600 }
1601
1602 /*
1603  * reload the ondisk the header 
1604  */
1605 static int rbd_read_header(struct rbd_device *rbd_dev,
1606                            struct rbd_image_header *header)
1607 {
1608         ssize_t rc;
1609         struct rbd_image_header_ondisk *dh;
1610         u32 snap_count = 0;
1611         u64 ver;
1612         size_t len;
1613
1614         /*
1615          * First reads the fixed-size header to determine the number
1616          * of snapshots, then re-reads it, along with all snapshot
1617          * records as well as their stored names.
1618          */
1619         len = sizeof (*dh);
1620         while (1) {
1621                 dh = kmalloc(len, GFP_KERNEL);
1622                 if (!dh)
1623                         return -ENOMEM;
1624
1625                 rc = rbd_req_sync_read(rbd_dev,
1626                                        CEPH_NOSNAP,
1627                                        rbd_dev->header_name,
1628                                        0, len,
1629                                        (char *)dh, &ver);
1630                 if (rc < 0)
1631                         goto out_dh;
1632
1633                 rc = rbd_header_from_disk(header, dh, snap_count);
1634                 if (rc < 0) {
1635                         if (rc == -ENXIO)
1636                                 pr_warning("unrecognized header format"
1637                                            " for image %s\n",
1638                                            rbd_dev->image_name);
1639                         goto out_dh;
1640                 }
1641
1642                 if (snap_count == header->total_snaps)
1643                         break;
1644
1645                 snap_count = header->total_snaps;
1646                 len = sizeof (*dh) +
1647                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1648                         header->snap_names_len;
1649
1650                 rbd_header_free(header);
1651                 kfree(dh);
1652         }
1653         header->obj_version = ver;
1654
1655 out_dh:
1656         kfree(dh);
1657         return rc;
1658 }
1659
1660 /*
1661  * create a snapshot
1662  */
1663 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1664                                const char *snap_name,
1665                                gfp_t gfp_flags)
1666 {
1667         int name_len = strlen(snap_name);
1668         u64 new_snapid;
1669         int ret;
1670         void *data, *p, *e;
1671         u64 ver;
1672         struct ceph_mon_client *monc;
1673
1674         /* we should create a snapshot only if we're pointing at the head */
1675         if (rbd_dev->snap_id != CEPH_NOSNAP)
1676                 return -EINVAL;
1677
1678         monc = &rbd_dev->rbd_client->client->monc;
1679         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1680         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1681         if (ret < 0)
1682                 return ret;
1683
1684         data = kmalloc(name_len + 16, gfp_flags);
1685         if (!data)
1686                 return -ENOMEM;
1687
1688         p = data;
1689         e = data + name_len + 16;
1690
1691         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1692         ceph_encode_64_safe(&p, e, new_snapid, bad);
1693
1694         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1695                                 "rbd", "snap_add",
1696                                 data, p - data, &ver);
1697
1698         kfree(data);
1699
1700         return ret < 0 ? ret : 0;
1701 bad:
1702         return -ERANGE;
1703 }
1704
1705 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1706 {
1707         struct rbd_snap *snap;
1708         struct rbd_snap *next;
1709
1710         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1711                 __rbd_remove_snap_dev(snap);
1712 }
1713
1714 /*
1715  * only read the first part of the ondisk header, without the snaps info
1716  */
1717 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1718 {
1719         int ret;
1720         struct rbd_image_header h;
1721
1722         ret = rbd_read_header(rbd_dev, &h);
1723         if (ret < 0)
1724                 return ret;
1725
1726         down_write(&rbd_dev->header_rwsem);
1727
1728         /* resized? */
1729         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1730                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1731
1732                 dout("setting size to %llu sectors", (unsigned long long) size);
1733                 set_capacity(rbd_dev->disk, size);
1734         }
1735
1736         /* rbd_dev->header.object_prefix shouldn't change */
1737         kfree(rbd_dev->header.snap_sizes);
1738         kfree(rbd_dev->header.snap_names);
1739         /* osd requests may still refer to snapc */
1740         ceph_put_snap_context(rbd_dev->header.snapc);
1741
1742         rbd_dev->header.obj_version = h.obj_version;
1743         rbd_dev->header.image_size = h.image_size;
1744         rbd_dev->header.total_snaps = h.total_snaps;
1745         rbd_dev->header.snapc = h.snapc;
1746         rbd_dev->header.snap_names = h.snap_names;
1747         rbd_dev->header.snap_names_len = h.snap_names_len;
1748         rbd_dev->header.snap_sizes = h.snap_sizes;
1749         /* Free the extra copy of the object prefix */
1750         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1751         kfree(h.object_prefix);
1752
1753         ret = __rbd_init_snaps_header(rbd_dev);
1754
1755         up_write(&rbd_dev->header_rwsem);
1756
1757         return ret;
1758 }
1759
1760 static int rbd_init_disk(struct rbd_device *rbd_dev)
1761 {
1762         struct gendisk *disk;
1763         struct request_queue *q;
1764         int rc;
1765         u64 segment_size;
1766         u64 total_size = 0;
1767
1768         /* contact OSD, request size info about the object being mapped */
1769         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1770         if (rc)
1771                 return rc;
1772
1773         /* no need to lock here, as rbd_dev is not registered yet */
1774         rc = __rbd_init_snaps_header(rbd_dev);
1775         if (rc)
1776                 return rc;
1777
1778         rc = rbd_header_set_snap(rbd_dev, &total_size);
1779         if (rc)
1780                 return rc;
1781
1782         /* create gendisk info */
1783         rc = -ENOMEM;
1784         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1785         if (!disk)
1786                 goto out;
1787
1788         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1789                  rbd_dev->dev_id);
1790         disk->major = rbd_dev->major;
1791         disk->first_minor = 0;
1792         disk->fops = &rbd_bd_ops;
1793         disk->private_data = rbd_dev;
1794
1795         /* init rq */
1796         rc = -ENOMEM;
1797         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1798         if (!q)
1799                 goto out_disk;
1800
1801         /* We use the default size, but let's be explicit about it. */
1802         blk_queue_physical_block_size(q, SECTOR_SIZE);
1803
1804         /* set io sizes to object size */
1805         segment_size = rbd_obj_bytes(&rbd_dev->header);
1806         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1807         blk_queue_max_segment_size(q, segment_size);
1808         blk_queue_io_min(q, segment_size);
1809         blk_queue_io_opt(q, segment_size);
1810
1811         blk_queue_merge_bvec(q, rbd_merge_bvec);
1812         disk->queue = q;
1813
1814         q->queuedata = rbd_dev;
1815
1816         rbd_dev->disk = disk;
1817         rbd_dev->q = q;
1818
1819         /* finally, announce the disk to the world */
1820         set_capacity(disk, total_size / SECTOR_SIZE);
1821         add_disk(disk);
1822
1823         pr_info("%s: added with size 0x%llx\n",
1824                 disk->disk_name, (unsigned long long)total_size);
1825         return 0;
1826
1827 out_disk:
1828         put_disk(disk);
1829 out:
1830         return rc;
1831 }
1832
1833 /*
1834   sysfs
1835 */
1836
1837 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1838 {
1839         return container_of(dev, struct rbd_device, dev);
1840 }
1841
1842 static ssize_t rbd_size_show(struct device *dev,
1843                              struct device_attribute *attr, char *buf)
1844 {
1845         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1846         sector_t size;
1847
1848         down_read(&rbd_dev->header_rwsem);
1849         size = get_capacity(rbd_dev->disk);
1850         up_read(&rbd_dev->header_rwsem);
1851
1852         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1853 }
1854
1855 static ssize_t rbd_major_show(struct device *dev,
1856                               struct device_attribute *attr, char *buf)
1857 {
1858         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1859
1860         return sprintf(buf, "%d\n", rbd_dev->major);
1861 }
1862
1863 static ssize_t rbd_client_id_show(struct device *dev,
1864                                   struct device_attribute *attr, char *buf)
1865 {
1866         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1867
1868         return sprintf(buf, "client%lld\n",
1869                         ceph_client_id(rbd_dev->rbd_client->client));
1870 }
1871
1872 static ssize_t rbd_pool_show(struct device *dev,
1873                              struct device_attribute *attr, char *buf)
1874 {
1875         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1876
1877         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1878 }
1879
1880 static ssize_t rbd_pool_id_show(struct device *dev,
1881                              struct device_attribute *attr, char *buf)
1882 {
1883         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1884
1885         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1886 }
1887
1888 static ssize_t rbd_name_show(struct device *dev,
1889                              struct device_attribute *attr, char *buf)
1890 {
1891         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1892
1893         return sprintf(buf, "%s\n", rbd_dev->image_name);
1894 }
1895
1896 static ssize_t rbd_snap_show(struct device *dev,
1897                              struct device_attribute *attr,
1898                              char *buf)
1899 {
1900         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1901
1902         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1903 }
1904
1905 static ssize_t rbd_image_refresh(struct device *dev,
1906                                  struct device_attribute *attr,
1907                                  const char *buf,
1908                                  size_t size)
1909 {
1910         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1911         int rc;
1912         int ret = size;
1913
1914         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1915
1916         rc = __rbd_refresh_header(rbd_dev);
1917         if (rc < 0)
1918                 ret = rc;
1919
1920         mutex_unlock(&ctl_mutex);
1921         return ret;
1922 }
1923
1924 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1925 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1926 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1927 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1928 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1929 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1930 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1931 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1932 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1933
1934 static struct attribute *rbd_attrs[] = {
1935         &dev_attr_size.attr,
1936         &dev_attr_major.attr,
1937         &dev_attr_client_id.attr,
1938         &dev_attr_pool.attr,
1939         &dev_attr_pool_id.attr,
1940         &dev_attr_name.attr,
1941         &dev_attr_current_snap.attr,
1942         &dev_attr_refresh.attr,
1943         &dev_attr_create_snap.attr,
1944         NULL
1945 };
1946
1947 static struct attribute_group rbd_attr_group = {
1948         .attrs = rbd_attrs,
1949 };
1950
1951 static const struct attribute_group *rbd_attr_groups[] = {
1952         &rbd_attr_group,
1953         NULL
1954 };
1955
1956 static void rbd_sysfs_dev_release(struct device *dev)
1957 {
1958 }
1959
1960 static struct device_type rbd_device_type = {
1961         .name           = "rbd",
1962         .groups         = rbd_attr_groups,
1963         .release        = rbd_sysfs_dev_release,
1964 };
1965
1966
1967 /*
1968   sysfs - snapshots
1969 */
1970
1971 static ssize_t rbd_snap_size_show(struct device *dev,
1972                                   struct device_attribute *attr,
1973                                   char *buf)
1974 {
1975         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1976
1977         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1978 }
1979
1980 static ssize_t rbd_snap_id_show(struct device *dev,
1981                                 struct device_attribute *attr,
1982                                 char *buf)
1983 {
1984         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1985
1986         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1987 }
1988
1989 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1990 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1991
1992 static struct attribute *rbd_snap_attrs[] = {
1993         &dev_attr_snap_size.attr,
1994         &dev_attr_snap_id.attr,
1995         NULL,
1996 };
1997
1998 static struct attribute_group rbd_snap_attr_group = {
1999         .attrs = rbd_snap_attrs,
2000 };
2001
2002 static void rbd_snap_dev_release(struct device *dev)
2003 {
2004         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2005         kfree(snap->name);
2006         kfree(snap);
2007 }
2008
2009 static const struct attribute_group *rbd_snap_attr_groups[] = {
2010         &rbd_snap_attr_group,
2011         NULL
2012 };
2013
2014 static struct device_type rbd_snap_device_type = {
2015         .groups         = rbd_snap_attr_groups,
2016         .release        = rbd_snap_dev_release,
2017 };
2018
2019 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2020 {
2021         list_del(&snap->node);
2022         device_unregister(&snap->dev);
2023 }
2024
2025 static int rbd_register_snap_dev(struct rbd_snap *snap,
2026                                   struct device *parent)
2027 {
2028         struct device *dev = &snap->dev;
2029         int ret;
2030
2031         dev->type = &rbd_snap_device_type;
2032         dev->parent = parent;
2033         dev->release = rbd_snap_dev_release;
2034         dev_set_name(dev, "snap_%s", snap->name);
2035         ret = device_register(dev);
2036
2037         return ret;
2038 }
2039
2040 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2041                                               int i, const char *name)
2042 {
2043         struct rbd_snap *snap;
2044         int ret;
2045
2046         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2047         if (!snap)
2048                 return ERR_PTR(-ENOMEM);
2049
2050         ret = -ENOMEM;
2051         snap->name = kstrdup(name, GFP_KERNEL);
2052         if (!snap->name)
2053                 goto err;
2054
2055         snap->size = rbd_dev->header.snap_sizes[i];
2056         snap->id = rbd_dev->header.snapc->snaps[i];
2057         if (device_is_registered(&rbd_dev->dev)) {
2058                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2059                 if (ret < 0)
2060                         goto err;
2061         }
2062
2063         return snap;
2064
2065 err:
2066         kfree(snap->name);
2067         kfree(snap);
2068
2069         return ERR_PTR(ret);
2070 }
2071
2072 /*
2073  * search for the previous snap in a null delimited string list
2074  */
2075 const char *rbd_prev_snap_name(const char *name, const char *start)
2076 {
2077         if (name < start + 2)
2078                 return NULL;
2079
2080         name -= 2;
2081         while (*name) {
2082                 if (name == start)
2083                         return start;
2084                 name--;
2085         }
2086         return name + 1;
2087 }
2088
2089 /*
2090  * compare the old list of snapshots that we have to what's in the header
2091  * and update it accordingly. Note that the header holds the snapshots
2092  * in a reverse order (from newest to oldest) and we need to go from
2093  * older to new so that we don't get a duplicate snap name when
2094  * doing the process (e.g., removed snapshot and recreated a new
2095  * one with the same name.
2096  */
2097 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2098 {
2099         const char *name, *first_name;
2100         int i = rbd_dev->header.total_snaps;
2101         struct rbd_snap *snap, *old_snap = NULL;
2102         struct list_head *p, *n;
2103
2104         first_name = rbd_dev->header.snap_names;
2105         name = first_name + rbd_dev->header.snap_names_len;
2106
2107         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2108                 u64 cur_id;
2109
2110                 old_snap = list_entry(p, struct rbd_snap, node);
2111
2112                 if (i)
2113                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2114
2115                 if (!i || old_snap->id < cur_id) {
2116                         /*
2117                          * old_snap->id was skipped, thus was
2118                          * removed.  If this rbd_dev is mapped to
2119                          * the removed snapshot, record that it no
2120                          * longer exists, to prevent further I/O.
2121                          */
2122                         if (rbd_dev->snap_id == old_snap->id)
2123                                 rbd_dev->snap_exists = false;
2124                         __rbd_remove_snap_dev(old_snap);
2125                         continue;
2126                 }
2127                 if (old_snap->id == cur_id) {
2128                         /* we have this snapshot already */
2129                         i--;
2130                         name = rbd_prev_snap_name(name, first_name);
2131                         continue;
2132                 }
2133                 for (; i > 0;
2134                      i--, name = rbd_prev_snap_name(name, first_name)) {
2135                         if (!name) {
2136                                 WARN_ON(1);
2137                                 return -EINVAL;
2138                         }
2139                         cur_id = rbd_dev->header.snapc->snaps[i];
2140                         /* snapshot removal? handle it above */
2141                         if (cur_id >= old_snap->id)
2142                                 break;
2143                         /* a new snapshot */
2144                         snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2145                         if (IS_ERR(snap))
2146                                 return PTR_ERR(snap);
2147
2148                         /* note that we add it backward so using n and not p */
2149                         list_add(&snap->node, n);
2150                         p = &snap->node;
2151                 }
2152         }
2153         /* we're done going over the old snap list, just add what's left */
2154         for (; i > 0; i--) {
2155                 name = rbd_prev_snap_name(name, first_name);
2156                 if (!name) {
2157                         WARN_ON(1);
2158                         return -EINVAL;
2159                 }
2160                 snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2161                 if (IS_ERR(snap))
2162                         return PTR_ERR(snap);
2163                 list_add(&snap->node, &rbd_dev->snaps);
2164         }
2165
2166         return 0;
2167 }
2168
2169 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2170 {
2171         int ret;
2172         struct device *dev;
2173         struct rbd_snap *snap;
2174
2175         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2176         dev = &rbd_dev->dev;
2177
2178         dev->bus = &rbd_bus_type;
2179         dev->type = &rbd_device_type;
2180         dev->parent = &rbd_root_dev;
2181         dev->release = rbd_dev_release;
2182         dev_set_name(dev, "%d", rbd_dev->dev_id);
2183         ret = device_register(dev);
2184         if (ret < 0)
2185                 goto out;
2186
2187         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2188                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2189                 if (ret < 0)
2190                         break;
2191         }
2192 out:
2193         mutex_unlock(&ctl_mutex);
2194         return ret;
2195 }
2196
2197 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2198 {
2199         device_unregister(&rbd_dev->dev);
2200 }
2201
2202 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2203 {
2204         int ret, rc;
2205
2206         do {
2207                 ret = rbd_req_sync_watch(rbd_dev);
2208                 if (ret == -ERANGE) {
2209                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2210                         rc = __rbd_refresh_header(rbd_dev);
2211                         mutex_unlock(&ctl_mutex);
2212                         if (rc < 0)
2213                                 return rc;
2214                 }
2215         } while (ret == -ERANGE);
2216
2217         return ret;
2218 }
2219
2220 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2221
2222 /*
2223  * Get a unique rbd identifier for the given new rbd_dev, and add
2224  * the rbd_dev to the global list.  The minimum rbd id is 1.
2225  */
2226 static void rbd_id_get(struct rbd_device *rbd_dev)
2227 {
2228         rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2229
2230         spin_lock(&rbd_dev_list_lock);
2231         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2232         spin_unlock(&rbd_dev_list_lock);
2233 }
2234
2235 /*
2236  * Remove an rbd_dev from the global list, and record that its
2237  * identifier is no longer in use.
2238  */
2239 static void rbd_id_put(struct rbd_device *rbd_dev)
2240 {
2241         struct list_head *tmp;
2242         int rbd_id = rbd_dev->dev_id;
2243         int max_id;
2244
2245         BUG_ON(rbd_id < 1);
2246
2247         spin_lock(&rbd_dev_list_lock);
2248         list_del_init(&rbd_dev->node);
2249
2250         /*
2251          * If the id being "put" is not the current maximum, there
2252          * is nothing special we need to do.
2253          */
2254         if (rbd_id != atomic64_read(&rbd_id_max)) {
2255                 spin_unlock(&rbd_dev_list_lock);
2256                 return;
2257         }
2258
2259         /*
2260          * We need to update the current maximum id.  Search the
2261          * list to find out what it is.  We're more likely to find
2262          * the maximum at the end, so search the list backward.
2263          */
2264         max_id = 0;
2265         list_for_each_prev(tmp, &rbd_dev_list) {
2266                 struct rbd_device *rbd_dev;
2267
2268                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2269                 if (rbd_id > max_id)
2270                         max_id = rbd_id;
2271         }
2272         spin_unlock(&rbd_dev_list_lock);
2273
2274         /*
2275          * The max id could have been updated by rbd_id_get(), in
2276          * which case it now accurately reflects the new maximum.
2277          * Be careful not to overwrite the maximum value in that
2278          * case.
2279          */
2280         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2281 }
2282
2283 /*
2284  * Skips over white space at *buf, and updates *buf to point to the
2285  * first found non-space character (if any). Returns the length of
2286  * the token (string of non-white space characters) found.  Note
2287  * that *buf must be terminated with '\0'.
2288  */
2289 static inline size_t next_token(const char **buf)
2290 {
2291         /*
2292         * These are the characters that produce nonzero for
2293         * isspace() in the "C" and "POSIX" locales.
2294         */
2295         const char *spaces = " \f\n\r\t\v";
2296
2297         *buf += strspn(*buf, spaces);   /* Find start of token */
2298
2299         return strcspn(*buf, spaces);   /* Return token length */
2300 }
2301
2302 /*
2303  * Finds the next token in *buf, and if the provided token buffer is
2304  * big enough, copies the found token into it.  The result, if
2305  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2306  * must be terminated with '\0' on entry.
2307  *
2308  * Returns the length of the token found (not including the '\0').
2309  * Return value will be 0 if no token is found, and it will be >=
2310  * token_size if the token would not fit.
2311  *
2312  * The *buf pointer will be updated to point beyond the end of the
2313  * found token.  Note that this occurs even if the token buffer is
2314  * too small to hold it.
2315  */
2316 static inline size_t copy_token(const char **buf,
2317                                 char *token,
2318                                 size_t token_size)
2319 {
2320         size_t len;
2321
2322         len = next_token(buf);
2323         if (len < token_size) {
2324                 memcpy(token, *buf, len);
2325                 *(token + len) = '\0';
2326         }
2327         *buf += len;
2328
2329         return len;
2330 }
2331
2332 /*
2333  * Finds the next token in *buf, dynamically allocates a buffer big
2334  * enough to hold a copy of it, and copies the token into the new
2335  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2336  * that a duplicate buffer is created even for a zero-length token.
2337  *
2338  * Returns a pointer to the newly-allocated duplicate, or a null
2339  * pointer if memory for the duplicate was not available.  If
2340  * the lenp argument is a non-null pointer, the length of the token
2341  * (not including the '\0') is returned in *lenp.
2342  *
2343  * If successful, the *buf pointer will be updated to point beyond
2344  * the end of the found token.
2345  *
2346  * Note: uses GFP_KERNEL for allocation.
2347  */
2348 static inline char *dup_token(const char **buf, size_t *lenp)
2349 {
2350         char *dup;
2351         size_t len;
2352
2353         len = next_token(buf);
2354         dup = kmalloc(len + 1, GFP_KERNEL);
2355         if (!dup)
2356                 return NULL;
2357
2358         memcpy(dup, *buf, len);
2359         *(dup + len) = '\0';
2360         *buf += len;
2361
2362         if (lenp)
2363                 *lenp = len;
2364
2365         return dup;
2366 }
2367
2368 /*
2369  * This fills in the pool_name, image_name, image_name_len, snap_name,
2370  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2371  * on the list of monitor addresses and other options provided via
2372  * /sys/bus/rbd/add.
2373  *
2374  * Note: rbd_dev is assumed to have been initially zero-filled.
2375  */
2376 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2377                               const char *buf,
2378                               const char **mon_addrs,
2379                               size_t *mon_addrs_size,
2380                               char *options,
2381                              size_t options_size)
2382 {
2383         size_t len;
2384         int ret;
2385
2386         /* The first four tokens are required */
2387
2388         len = next_token(&buf);
2389         if (!len)
2390                 return -EINVAL;
2391         *mon_addrs_size = len + 1;
2392         *mon_addrs = buf;
2393
2394         buf += len;
2395
2396         len = copy_token(&buf, options, options_size);
2397         if (!len || len >= options_size)
2398                 return -EINVAL;
2399
2400         ret = -ENOMEM;
2401         rbd_dev->pool_name = dup_token(&buf, NULL);
2402         if (!rbd_dev->pool_name)
2403                 goto out_err;
2404
2405         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2406         if (!rbd_dev->image_name)
2407                 goto out_err;
2408
2409         /* Create the name of the header object */
2410
2411         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2412                                                 + sizeof (RBD_SUFFIX),
2413                                         GFP_KERNEL);
2414         if (!rbd_dev->header_name)
2415                 goto out_err;
2416         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2417
2418         /*
2419          * The snapshot name is optional.  If none is is supplied,
2420          * we use the default value.
2421          */
2422         rbd_dev->snap_name = dup_token(&buf, &len);
2423         if (!rbd_dev->snap_name)
2424                 goto out_err;
2425         if (!len) {
2426                 /* Replace the empty name with the default */
2427                 kfree(rbd_dev->snap_name);
2428                 rbd_dev->snap_name
2429                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2430                 if (!rbd_dev->snap_name)
2431                         goto out_err;
2432
2433                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2434                         sizeof (RBD_SNAP_HEAD_NAME));
2435         }
2436
2437         return 0;
2438
2439 out_err:
2440         kfree(rbd_dev->header_name);
2441         kfree(rbd_dev->image_name);
2442         kfree(rbd_dev->pool_name);
2443         rbd_dev->pool_name = NULL;
2444
2445         return ret;
2446 }
2447
2448 static ssize_t rbd_add(struct bus_type *bus,
2449                        const char *buf,
2450                        size_t count)
2451 {
2452         char *options;
2453         struct rbd_device *rbd_dev = NULL;
2454         const char *mon_addrs = NULL;
2455         size_t mon_addrs_size = 0;
2456         struct ceph_osd_client *osdc;
2457         int rc = -ENOMEM;
2458
2459         if (!try_module_get(THIS_MODULE))
2460                 return -ENODEV;
2461
2462         options = kmalloc(count, GFP_KERNEL);
2463         if (!options)
2464                 goto err_nomem;
2465         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2466         if (!rbd_dev)
2467                 goto err_nomem;
2468
2469         /* static rbd_device initialization */
2470         spin_lock_init(&rbd_dev->lock);
2471         INIT_LIST_HEAD(&rbd_dev->node);
2472         INIT_LIST_HEAD(&rbd_dev->snaps);
2473         init_rwsem(&rbd_dev->header_rwsem);
2474
2475         /* generate unique id: find highest unique id, add one */
2476         rbd_id_get(rbd_dev);
2477
2478         /* Fill in the device name, now that we have its id. */
2479         BUILD_BUG_ON(DEV_NAME_LEN
2480                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2481         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2482
2483         /* parse add command */
2484         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2485                                 options, count);
2486         if (rc)
2487                 goto err_put_id;
2488
2489         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2490                                                 options);
2491         if (IS_ERR(rbd_dev->rbd_client)) {
2492                 rc = PTR_ERR(rbd_dev->rbd_client);
2493                 goto err_put_id;
2494         }
2495
2496         /* pick the pool */
2497         osdc = &rbd_dev->rbd_client->client->osdc;
2498         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2499         if (rc < 0)
2500                 goto err_out_client;
2501         rbd_dev->pool_id = rc;
2502
2503         /* register our block device */
2504         rc = register_blkdev(0, rbd_dev->name);
2505         if (rc < 0)
2506                 goto err_out_client;
2507         rbd_dev->major = rc;
2508
2509         rc = rbd_bus_add_dev(rbd_dev);
2510         if (rc)
2511                 goto err_out_blkdev;
2512
2513         /*
2514          * At this point cleanup in the event of an error is the job
2515          * of the sysfs code (initiated by rbd_bus_del_dev()).
2516          *
2517          * Set up and announce blkdev mapping.
2518          */
2519         rc = rbd_init_disk(rbd_dev);
2520         if (rc)
2521                 goto err_out_bus;
2522
2523         rc = rbd_init_watch_dev(rbd_dev);
2524         if (rc)
2525                 goto err_out_bus;
2526
2527         return count;
2528
2529 err_out_bus:
2530         /* this will also clean up rest of rbd_dev stuff */
2531
2532         rbd_bus_del_dev(rbd_dev);
2533         kfree(options);
2534         return rc;
2535
2536 err_out_blkdev:
2537         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2538 err_out_client:
2539         rbd_put_client(rbd_dev);
2540 err_put_id:
2541         if (rbd_dev->pool_name) {
2542                 kfree(rbd_dev->snap_name);
2543                 kfree(rbd_dev->header_name);
2544                 kfree(rbd_dev->image_name);
2545                 kfree(rbd_dev->pool_name);
2546         }
2547         rbd_id_put(rbd_dev);
2548 err_nomem:
2549         kfree(rbd_dev);
2550         kfree(options);
2551
2552         dout("Error adding device %s\n", buf);
2553         module_put(THIS_MODULE);
2554
2555         return (ssize_t) rc;
2556 }
2557
2558 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2559 {
2560         struct list_head *tmp;
2561         struct rbd_device *rbd_dev;
2562
2563         spin_lock(&rbd_dev_list_lock);
2564         list_for_each(tmp, &rbd_dev_list) {
2565                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2566                 if (rbd_dev->dev_id == dev_id) {
2567                         spin_unlock(&rbd_dev_list_lock);
2568                         return rbd_dev;
2569                 }
2570         }
2571         spin_unlock(&rbd_dev_list_lock);
2572         return NULL;
2573 }
2574
2575 static void rbd_dev_release(struct device *dev)
2576 {
2577         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2578
2579         if (rbd_dev->watch_request) {
2580                 struct ceph_client *client = rbd_dev->rbd_client->client;
2581
2582                 ceph_osdc_unregister_linger_request(&client->osdc,
2583                                                     rbd_dev->watch_request);
2584         }
2585         if (rbd_dev->watch_event)
2586                 rbd_req_sync_unwatch(rbd_dev);
2587
2588         rbd_put_client(rbd_dev);
2589
2590         /* clean up and free blkdev */
2591         rbd_free_disk(rbd_dev);
2592         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2593
2594         /* done with the id, and with the rbd_dev */
2595         kfree(rbd_dev->snap_name);
2596         kfree(rbd_dev->header_name);
2597         kfree(rbd_dev->pool_name);
2598         kfree(rbd_dev->image_name);
2599         rbd_id_put(rbd_dev);
2600         kfree(rbd_dev);
2601
2602         /* release module ref */
2603         module_put(THIS_MODULE);
2604 }
2605
2606 static ssize_t rbd_remove(struct bus_type *bus,
2607                           const char *buf,
2608                           size_t count)
2609 {
2610         struct rbd_device *rbd_dev = NULL;
2611         int target_id, rc;
2612         unsigned long ul;
2613         int ret = count;
2614
2615         rc = strict_strtoul(buf, 10, &ul);
2616         if (rc)
2617                 return rc;
2618
2619         /* convert to int; abort if we lost anything in the conversion */
2620         target_id = (int) ul;
2621         if (target_id != ul)
2622                 return -EINVAL;
2623
2624         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2625
2626         rbd_dev = __rbd_get_dev(target_id);
2627         if (!rbd_dev) {
2628                 ret = -ENOENT;
2629                 goto done;
2630         }
2631
2632         __rbd_remove_all_snaps(rbd_dev);
2633         rbd_bus_del_dev(rbd_dev);
2634
2635 done:
2636         mutex_unlock(&ctl_mutex);
2637         return ret;
2638 }
2639
2640 static ssize_t rbd_snap_add(struct device *dev,
2641                             struct device_attribute *attr,
2642                             const char *buf,
2643                             size_t count)
2644 {
2645         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2646         int ret;
2647         char *name = kmalloc(count + 1, GFP_KERNEL);
2648         if (!name)
2649                 return -ENOMEM;
2650
2651         snprintf(name, count, "%s", buf);
2652
2653         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2654
2655         ret = rbd_header_add_snap(rbd_dev,
2656                                   name, GFP_KERNEL);
2657         if (ret < 0)
2658                 goto err_unlock;
2659
2660         ret = __rbd_refresh_header(rbd_dev);
2661         if (ret < 0)
2662                 goto err_unlock;
2663
2664         /* shouldn't hold ctl_mutex when notifying.. notify might
2665            trigger a watch callback that would need to get that mutex */
2666         mutex_unlock(&ctl_mutex);
2667
2668         /* make a best effort, don't error if failed */
2669         rbd_req_sync_notify(rbd_dev);
2670
2671         ret = count;
2672         kfree(name);
2673         return ret;
2674
2675 err_unlock:
2676         mutex_unlock(&ctl_mutex);
2677         kfree(name);
2678         return ret;
2679 }
2680
2681 /*
2682  * create control files in sysfs
2683  * /sys/bus/rbd/...
2684  */
2685 static int rbd_sysfs_init(void)
2686 {
2687         int ret;
2688
2689         ret = device_register(&rbd_root_dev);
2690         if (ret < 0)
2691                 return ret;
2692
2693         ret = bus_register(&rbd_bus_type);
2694         if (ret < 0)
2695                 device_unregister(&rbd_root_dev);
2696
2697         return ret;
2698 }
2699
2700 static void rbd_sysfs_cleanup(void)
2701 {
2702         bus_unregister(&rbd_bus_type);
2703         device_unregister(&rbd_root_dev);
2704 }
2705
2706 int __init rbd_init(void)
2707 {
2708         int rc;
2709
2710         rc = rbd_sysfs_init();
2711         if (rc)
2712                 return rc;
2713         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2714         return 0;
2715 }
2716
2717 void __exit rbd_exit(void)
2718 {
2719         rbd_sysfs_cleanup();
2720 }
2721
2722 module_init(rbd_init);
2723 module_exit(rbd_exit);
2724
2725 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2726 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2727 MODULE_DESCRIPTION("rados block device");
2728
2729 /* following authorship retained from original osdblk.c */
2730 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2731
2732 MODULE_LICENSE("GPL");