rbd: add an object request flag for image data objects
[firefly-linux-kernel-4.4.55.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (0)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 obj_version;
112 };
113
114 /*
115  * An rbd image specification.
116  *
117  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118  * identify an image.  Each rbd_dev structure includes a pointer to
119  * an rbd_spec structure that encapsulates this identity.
120  *
121  * Each of the id's in an rbd_spec has an associated name.  For a
122  * user-mapped image, the names are supplied and the id's associated
123  * with them are looked up.  For a layered image, a parent image is
124  * defined by the tuple, and the names are looked up.
125  *
126  * An rbd_dev structure contains a parent_spec pointer which is
127  * non-null if the image it represents is a child in a layered
128  * image.  This pointer will refer to the rbd_spec structure used
129  * by the parent rbd_dev for its own identity (i.e., the structure
130  * is shared between the parent and child).
131  *
132  * Since these structures are populated once, during the discovery
133  * phase of image construction, they are effectively immutable so
134  * we make no effort to synchronize access to them.
135  *
136  * Note that code herein does not assume the image name is known (it
137  * could be a null pointer).
138  */
139 struct rbd_spec {
140         u64             pool_id;
141         char            *pool_name;
142
143         char            *image_id;
144         char            *image_name;
145
146         u64             snap_id;
147         char            *snap_name;
148
149         struct kref     kref;
150 };
151
152 /*
153  * an instance of the client.  multiple devices may share an rbd client.
154  */
155 struct rbd_client {
156         struct ceph_client      *client;
157         struct kref             kref;
158         struct list_head        node;
159 };
160
161 struct rbd_img_request;
162 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
165
166 struct rbd_obj_request;
167 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
169 enum obj_request_type {
170         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171 };
172
173 enum obj_req_flags {
174         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
175         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
176 };
177
178 struct rbd_obj_request {
179         const char              *object_name;
180         u64                     offset;         /* object start byte */
181         u64                     length;         /* bytes from offset */
182         unsigned long           flags;
183
184         struct rbd_img_request  *img_request;
185         u64                     img_offset;     /* image relative offset */
186         struct list_head        links;          /* img_request->obj_requests */
187         u32                     which;          /* posn image request list */
188
189         enum obj_request_type   type;
190         union {
191                 struct bio      *bio_list;
192                 struct {
193                         struct page     **pages;
194                         u32             page_count;
195                 };
196         };
197
198         struct ceph_osd_request *osd_req;
199
200         u64                     xferred;        /* bytes transferred */
201         u64                     version;
202         int                     result;
203
204         rbd_obj_callback_t      callback;
205         struct completion       completion;
206
207         struct kref             kref;
208 };
209
210 enum img_req_flags {
211         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
212         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
213         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
214 };
215
216 struct rbd_img_request {
217         struct rbd_device       *rbd_dev;
218         u64                     offset; /* starting image byte offset */
219         u64                     length; /* byte count from offset */
220         unsigned long           flags;
221         union {
222                 u64                     snap_id;        /* for reads */
223                 struct ceph_snap_context *snapc;        /* for writes */
224         };
225         union {
226                 struct request          *rq;            /* block request */
227                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
228         };
229         spinlock_t              completion_lock;/* protects next_completion */
230         u32                     next_completion;
231         rbd_img_callback_t      callback;
232         u64                     xferred;/* aggregate bytes transferred */
233         int                     result; /* first nonzero obj_request result */
234
235         u32                     obj_request_count;
236         struct list_head        obj_requests;   /* rbd_obj_request structs */
237
238         struct kref             kref;
239 };
240
241 #define for_each_obj_request(ireq, oreq) \
242         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
243 #define for_each_obj_request_from(ireq, oreq) \
244         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
245 #define for_each_obj_request_safe(ireq, oreq, n) \
246         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
247
248 struct rbd_snap {
249         struct  device          dev;
250         const char              *name;
251         u64                     size;
252         struct list_head        node;
253         u64                     id;
254         u64                     features;
255 };
256
257 struct rbd_mapping {
258         u64                     size;
259         u64                     features;
260         bool                    read_only;
261 };
262
263 /*
264  * a single device
265  */
266 struct rbd_device {
267         int                     dev_id;         /* blkdev unique id */
268
269         int                     major;          /* blkdev assigned major */
270         struct gendisk          *disk;          /* blkdev's gendisk and rq */
271
272         u32                     image_format;   /* Either 1 or 2 */
273         struct rbd_client       *rbd_client;
274
275         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
276
277         spinlock_t              lock;           /* queue, flags, open_count */
278
279         struct rbd_image_header header;
280         unsigned long           flags;          /* possibly lock protected */
281         struct rbd_spec         *spec;
282
283         char                    *header_name;
284
285         struct ceph_file_layout layout;
286
287         struct ceph_osd_event   *watch_event;
288         struct rbd_obj_request  *watch_request;
289
290         struct rbd_spec         *parent_spec;
291         u64                     parent_overlap;
292
293         /* protects updating the header */
294         struct rw_semaphore     header_rwsem;
295
296         struct rbd_mapping      mapping;
297
298         struct list_head        node;
299
300         /* list of snapshots */
301         struct list_head        snaps;
302
303         /* sysfs related */
304         struct device           dev;
305         unsigned long           open_count;     /* protected by lock */
306 };
307
308 /*
309  * Flag bits for rbd_dev->flags.  If atomicity is required,
310  * rbd_dev->lock is used to protect access.
311  *
312  * Currently, only the "removing" flag (which is coupled with the
313  * "open_count" field) requires atomic access.
314  */
315 enum rbd_dev_flags {
316         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
317         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
318 };
319
320 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
321
322 static LIST_HEAD(rbd_dev_list);    /* devices */
323 static DEFINE_SPINLOCK(rbd_dev_list_lock);
324
325 static LIST_HEAD(rbd_client_list);              /* clients */
326 static DEFINE_SPINLOCK(rbd_client_list_lock);
327
328 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
329 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
330
331 static void rbd_dev_release(struct device *dev);
332 static void rbd_remove_snap_dev(struct rbd_snap *snap);
333
334 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
335                        size_t count);
336 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
337                           size_t count);
338
339 static struct bus_attribute rbd_bus_attrs[] = {
340         __ATTR(add, S_IWUSR, NULL, rbd_add),
341         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
342         __ATTR_NULL
343 };
344
345 static struct bus_type rbd_bus_type = {
346         .name           = "rbd",
347         .bus_attrs      = rbd_bus_attrs,
348 };
349
350 static void rbd_root_dev_release(struct device *dev)
351 {
352 }
353
354 static struct device rbd_root_dev = {
355         .init_name =    "rbd",
356         .release =      rbd_root_dev_release,
357 };
358
359 static __printf(2, 3)
360 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
361 {
362         struct va_format vaf;
363         va_list args;
364
365         va_start(args, fmt);
366         vaf.fmt = fmt;
367         vaf.va = &args;
368
369         if (!rbd_dev)
370                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
371         else if (rbd_dev->disk)
372                 printk(KERN_WARNING "%s: %s: %pV\n",
373                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
374         else if (rbd_dev->spec && rbd_dev->spec->image_name)
375                 printk(KERN_WARNING "%s: image %s: %pV\n",
376                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
377         else if (rbd_dev->spec && rbd_dev->spec->image_id)
378                 printk(KERN_WARNING "%s: id %s: %pV\n",
379                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
380         else    /* punt */
381                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
382                         RBD_DRV_NAME, rbd_dev, &vaf);
383         va_end(args);
384 }
385
386 #ifdef RBD_DEBUG
387 #define rbd_assert(expr)                                                \
388                 if (unlikely(!(expr))) {                                \
389                         printk(KERN_ERR "\nAssertion failure in %s() "  \
390                                                 "at line %d:\n\n"       \
391                                         "\trbd_assert(%s);\n\n",        \
392                                         __func__, __LINE__, #expr);     \
393                         BUG();                                          \
394                 }
395 #else /* !RBD_DEBUG */
396 #  define rbd_assert(expr)      ((void) 0)
397 #endif /* !RBD_DEBUG */
398
399 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
400 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
401
402 static int rbd_open(struct block_device *bdev, fmode_t mode)
403 {
404         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
405         bool removing = false;
406
407         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
408                 return -EROFS;
409
410         spin_lock_irq(&rbd_dev->lock);
411         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
412                 removing = true;
413         else
414                 rbd_dev->open_count++;
415         spin_unlock_irq(&rbd_dev->lock);
416         if (removing)
417                 return -ENOENT;
418
419         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
420         (void) get_device(&rbd_dev->dev);
421         set_device_ro(bdev, rbd_dev->mapping.read_only);
422         mutex_unlock(&ctl_mutex);
423
424         return 0;
425 }
426
427 static int rbd_release(struct gendisk *disk, fmode_t mode)
428 {
429         struct rbd_device *rbd_dev = disk->private_data;
430         unsigned long open_count_before;
431
432         spin_lock_irq(&rbd_dev->lock);
433         open_count_before = rbd_dev->open_count--;
434         spin_unlock_irq(&rbd_dev->lock);
435         rbd_assert(open_count_before > 0);
436
437         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
438         put_device(&rbd_dev->dev);
439         mutex_unlock(&ctl_mutex);
440
441         return 0;
442 }
443
444 static const struct block_device_operations rbd_bd_ops = {
445         .owner                  = THIS_MODULE,
446         .open                   = rbd_open,
447         .release                = rbd_release,
448 };
449
450 /*
451  * Initialize an rbd client instance.
452  * We own *ceph_opts.
453  */
454 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
455 {
456         struct rbd_client *rbdc;
457         int ret = -ENOMEM;
458
459         dout("%s:\n", __func__);
460         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
461         if (!rbdc)
462                 goto out_opt;
463
464         kref_init(&rbdc->kref);
465         INIT_LIST_HEAD(&rbdc->node);
466
467         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
468
469         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
470         if (IS_ERR(rbdc->client))
471                 goto out_mutex;
472         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
473
474         ret = ceph_open_session(rbdc->client);
475         if (ret < 0)
476                 goto out_err;
477
478         spin_lock(&rbd_client_list_lock);
479         list_add_tail(&rbdc->node, &rbd_client_list);
480         spin_unlock(&rbd_client_list_lock);
481
482         mutex_unlock(&ctl_mutex);
483         dout("%s: rbdc %p\n", __func__, rbdc);
484
485         return rbdc;
486
487 out_err:
488         ceph_destroy_client(rbdc->client);
489 out_mutex:
490         mutex_unlock(&ctl_mutex);
491         kfree(rbdc);
492 out_opt:
493         if (ceph_opts)
494                 ceph_destroy_options(ceph_opts);
495         dout("%s: error %d\n", __func__, ret);
496
497         return ERR_PTR(ret);
498 }
499
500 /*
501  * Find a ceph client with specific addr and configuration.  If
502  * found, bump its reference count.
503  */
504 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
505 {
506         struct rbd_client *client_node;
507         bool found = false;
508
509         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
510                 return NULL;
511
512         spin_lock(&rbd_client_list_lock);
513         list_for_each_entry(client_node, &rbd_client_list, node) {
514                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
515                         kref_get(&client_node->kref);
516                         found = true;
517                         break;
518                 }
519         }
520         spin_unlock(&rbd_client_list_lock);
521
522         return found ? client_node : NULL;
523 }
524
525 /*
526  * mount options
527  */
528 enum {
529         Opt_last_int,
530         /* int args above */
531         Opt_last_string,
532         /* string args above */
533         Opt_read_only,
534         Opt_read_write,
535         /* Boolean args above */
536         Opt_last_bool,
537 };
538
539 static match_table_t rbd_opts_tokens = {
540         /* int args above */
541         /* string args above */
542         {Opt_read_only, "read_only"},
543         {Opt_read_only, "ro"},          /* Alternate spelling */
544         {Opt_read_write, "read_write"},
545         {Opt_read_write, "rw"},         /* Alternate spelling */
546         /* Boolean args above */
547         {-1, NULL}
548 };
549
550 struct rbd_options {
551         bool    read_only;
552 };
553
554 #define RBD_READ_ONLY_DEFAULT   false
555
556 static int parse_rbd_opts_token(char *c, void *private)
557 {
558         struct rbd_options *rbd_opts = private;
559         substring_t argstr[MAX_OPT_ARGS];
560         int token, intval, ret;
561
562         token = match_token(c, rbd_opts_tokens, argstr);
563         if (token < 0)
564                 return -EINVAL;
565
566         if (token < Opt_last_int) {
567                 ret = match_int(&argstr[0], &intval);
568                 if (ret < 0) {
569                         pr_err("bad mount option arg (not int) "
570                                "at '%s'\n", c);
571                         return ret;
572                 }
573                 dout("got int token %d val %d\n", token, intval);
574         } else if (token > Opt_last_int && token < Opt_last_string) {
575                 dout("got string token %d val %s\n", token,
576                      argstr[0].from);
577         } else if (token > Opt_last_string && token < Opt_last_bool) {
578                 dout("got Boolean token %d\n", token);
579         } else {
580                 dout("got token %d\n", token);
581         }
582
583         switch (token) {
584         case Opt_read_only:
585                 rbd_opts->read_only = true;
586                 break;
587         case Opt_read_write:
588                 rbd_opts->read_only = false;
589                 break;
590         default:
591                 rbd_assert(false);
592                 break;
593         }
594         return 0;
595 }
596
597 /*
598  * Get a ceph client with specific addr and configuration, if one does
599  * not exist create it.
600  */
601 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602 {
603         struct rbd_client *rbdc;
604
605         rbdc = rbd_client_find(ceph_opts);
606         if (rbdc)       /* using an existing client */
607                 ceph_destroy_options(ceph_opts);
608         else
609                 rbdc = rbd_client_create(ceph_opts);
610
611         return rbdc;
612 }
613
614 /*
615  * Destroy ceph client
616  *
617  * Caller must hold rbd_client_list_lock.
618  */
619 static void rbd_client_release(struct kref *kref)
620 {
621         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
622
623         dout("%s: rbdc %p\n", __func__, rbdc);
624         spin_lock(&rbd_client_list_lock);
625         list_del(&rbdc->node);
626         spin_unlock(&rbd_client_list_lock);
627
628         ceph_destroy_client(rbdc->client);
629         kfree(rbdc);
630 }
631
632 /*
633  * Drop reference to ceph client node. If it's not referenced anymore, release
634  * it.
635  */
636 static void rbd_put_client(struct rbd_client *rbdc)
637 {
638         if (rbdc)
639                 kref_put(&rbdc->kref, rbd_client_release);
640 }
641
642 static bool rbd_image_format_valid(u32 image_format)
643 {
644         return image_format == 1 || image_format == 2;
645 }
646
647 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
648 {
649         size_t size;
650         u32 snap_count;
651
652         /* The header has to start with the magic rbd header text */
653         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
654                 return false;
655
656         /* The bio layer requires at least sector-sized I/O */
657
658         if (ondisk->options.order < SECTOR_SHIFT)
659                 return false;
660
661         /* If we use u64 in a few spots we may be able to loosen this */
662
663         if (ondisk->options.order > 8 * sizeof (int) - 1)
664                 return false;
665
666         /*
667          * The size of a snapshot header has to fit in a size_t, and
668          * that limits the number of snapshots.
669          */
670         snap_count = le32_to_cpu(ondisk->snap_count);
671         size = SIZE_MAX - sizeof (struct ceph_snap_context);
672         if (snap_count > size / sizeof (__le64))
673                 return false;
674
675         /*
676          * Not only that, but the size of the entire the snapshot
677          * header must also be representable in a size_t.
678          */
679         size -= snap_count * sizeof (__le64);
680         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
681                 return false;
682
683         return true;
684 }
685
686 /*
687  * Create a new header structure, translate header format from the on-disk
688  * header.
689  */
690 static int rbd_header_from_disk(struct rbd_image_header *header,
691                                  struct rbd_image_header_ondisk *ondisk)
692 {
693         u32 snap_count;
694         size_t len;
695         size_t size;
696         u32 i;
697
698         memset(header, 0, sizeof (*header));
699
700         snap_count = le32_to_cpu(ondisk->snap_count);
701
702         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
703         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
704         if (!header->object_prefix)
705                 return -ENOMEM;
706         memcpy(header->object_prefix, ondisk->object_prefix, len);
707         header->object_prefix[len] = '\0';
708
709         if (snap_count) {
710                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
711
712                 /* Save a copy of the snapshot names */
713
714                 if (snap_names_len > (u64) SIZE_MAX)
715                         return -EIO;
716                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
717                 if (!header->snap_names)
718                         goto out_err;
719                 /*
720                  * Note that rbd_dev_v1_header_read() guarantees
721                  * the ondisk buffer we're working with has
722                  * snap_names_len bytes beyond the end of the
723                  * snapshot id array, this memcpy() is safe.
724                  */
725                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
726                         snap_names_len);
727
728                 /* Record each snapshot's size */
729
730                 size = snap_count * sizeof (*header->snap_sizes);
731                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
732                 if (!header->snap_sizes)
733                         goto out_err;
734                 for (i = 0; i < snap_count; i++)
735                         header->snap_sizes[i] =
736                                 le64_to_cpu(ondisk->snaps[i].image_size);
737         } else {
738                 WARN_ON(ondisk->snap_names_len);
739                 header->snap_names = NULL;
740                 header->snap_sizes = NULL;
741         }
742
743         header->features = 0;   /* No features support in v1 images */
744         header->obj_order = ondisk->options.order;
745         header->crypt_type = ondisk->options.crypt_type;
746         header->comp_type = ondisk->options.comp_type;
747
748         /* Allocate and fill in the snapshot context */
749
750         header->image_size = le64_to_cpu(ondisk->image_size);
751         size = sizeof (struct ceph_snap_context);
752         size += snap_count * sizeof (header->snapc->snaps[0]);
753         header->snapc = kzalloc(size, GFP_KERNEL);
754         if (!header->snapc)
755                 goto out_err;
756
757         atomic_set(&header->snapc->nref, 1);
758         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
759         header->snapc->num_snaps = snap_count;
760         for (i = 0; i < snap_count; i++)
761                 header->snapc->snaps[i] =
762                         le64_to_cpu(ondisk->snaps[i].id);
763
764         return 0;
765
766 out_err:
767         kfree(header->snap_sizes);
768         header->snap_sizes = NULL;
769         kfree(header->snap_names);
770         header->snap_names = NULL;
771         kfree(header->object_prefix);
772         header->object_prefix = NULL;
773
774         return -ENOMEM;
775 }
776
777 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
778 {
779         struct rbd_snap *snap;
780
781         if (snap_id == CEPH_NOSNAP)
782                 return RBD_SNAP_HEAD_NAME;
783
784         list_for_each_entry(snap, &rbd_dev->snaps, node)
785                 if (snap_id == snap->id)
786                         return snap->name;
787
788         return NULL;
789 }
790
791 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
792 {
793
794         struct rbd_snap *snap;
795
796         list_for_each_entry(snap, &rbd_dev->snaps, node) {
797                 if (!strcmp(snap_name, snap->name)) {
798                         rbd_dev->spec->snap_id = snap->id;
799                         rbd_dev->mapping.size = snap->size;
800                         rbd_dev->mapping.features = snap->features;
801
802                         return 0;
803                 }
804         }
805
806         return -ENOENT;
807 }
808
809 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
810 {
811         int ret;
812
813         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
814                     sizeof (RBD_SNAP_HEAD_NAME))) {
815                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
816                 rbd_dev->mapping.size = rbd_dev->header.image_size;
817                 rbd_dev->mapping.features = rbd_dev->header.features;
818                 ret = 0;
819         } else {
820                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
821                 if (ret < 0)
822                         goto done;
823                 rbd_dev->mapping.read_only = true;
824         }
825         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
826
827 done:
828         return ret;
829 }
830
831 static void rbd_header_free(struct rbd_image_header *header)
832 {
833         kfree(header->object_prefix);
834         header->object_prefix = NULL;
835         kfree(header->snap_sizes);
836         header->snap_sizes = NULL;
837         kfree(header->snap_names);
838         header->snap_names = NULL;
839         ceph_put_snap_context(header->snapc);
840         header->snapc = NULL;
841 }
842
843 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
844 {
845         char *name;
846         u64 segment;
847         int ret;
848
849         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
850         if (!name)
851                 return NULL;
852         segment = offset >> rbd_dev->header.obj_order;
853         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
854                         rbd_dev->header.object_prefix, segment);
855         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
856                 pr_err("error formatting segment name for #%llu (%d)\n",
857                         segment, ret);
858                 kfree(name);
859                 name = NULL;
860         }
861
862         return name;
863 }
864
865 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
866 {
867         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
868
869         return offset & (segment_size - 1);
870 }
871
872 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
873                                 u64 offset, u64 length)
874 {
875         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
876
877         offset &= segment_size - 1;
878
879         rbd_assert(length <= U64_MAX - offset);
880         if (offset + length > segment_size)
881                 length = segment_size - offset;
882
883         return length;
884 }
885
886 /*
887  * returns the size of an object in the image
888  */
889 static u64 rbd_obj_bytes(struct rbd_image_header *header)
890 {
891         return 1 << header->obj_order;
892 }
893
894 /*
895  * bio helpers
896  */
897
898 static void bio_chain_put(struct bio *chain)
899 {
900         struct bio *tmp;
901
902         while (chain) {
903                 tmp = chain;
904                 chain = chain->bi_next;
905                 bio_put(tmp);
906         }
907 }
908
909 /*
910  * zeros a bio chain, starting at specific offset
911  */
912 static void zero_bio_chain(struct bio *chain, int start_ofs)
913 {
914         struct bio_vec *bv;
915         unsigned long flags;
916         void *buf;
917         int i;
918         int pos = 0;
919
920         while (chain) {
921                 bio_for_each_segment(bv, chain, i) {
922                         if (pos + bv->bv_len > start_ofs) {
923                                 int remainder = max(start_ofs - pos, 0);
924                                 buf = bvec_kmap_irq(bv, &flags);
925                                 memset(buf + remainder, 0,
926                                        bv->bv_len - remainder);
927                                 bvec_kunmap_irq(buf, &flags);
928                         }
929                         pos += bv->bv_len;
930                 }
931
932                 chain = chain->bi_next;
933         }
934 }
935
936 /*
937  * Clone a portion of a bio, starting at the given byte offset
938  * and continuing for the number of bytes indicated.
939  */
940 static struct bio *bio_clone_range(struct bio *bio_src,
941                                         unsigned int offset,
942                                         unsigned int len,
943                                         gfp_t gfpmask)
944 {
945         struct bio_vec *bv;
946         unsigned int resid;
947         unsigned short idx;
948         unsigned int voff;
949         unsigned short end_idx;
950         unsigned short vcnt;
951         struct bio *bio;
952
953         /* Handle the easy case for the caller */
954
955         if (!offset && len == bio_src->bi_size)
956                 return bio_clone(bio_src, gfpmask);
957
958         if (WARN_ON_ONCE(!len))
959                 return NULL;
960         if (WARN_ON_ONCE(len > bio_src->bi_size))
961                 return NULL;
962         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
963                 return NULL;
964
965         /* Find first affected segment... */
966
967         resid = offset;
968         __bio_for_each_segment(bv, bio_src, idx, 0) {
969                 if (resid < bv->bv_len)
970                         break;
971                 resid -= bv->bv_len;
972         }
973         voff = resid;
974
975         /* ...and the last affected segment */
976
977         resid += len;
978         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
979                 if (resid <= bv->bv_len)
980                         break;
981                 resid -= bv->bv_len;
982         }
983         vcnt = end_idx - idx + 1;
984
985         /* Build the clone */
986
987         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
988         if (!bio)
989                 return NULL;    /* ENOMEM */
990
991         bio->bi_bdev = bio_src->bi_bdev;
992         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
993         bio->bi_rw = bio_src->bi_rw;
994         bio->bi_flags |= 1 << BIO_CLONED;
995
996         /*
997          * Copy over our part of the bio_vec, then update the first
998          * and last (or only) entries.
999          */
1000         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1001                         vcnt * sizeof (struct bio_vec));
1002         bio->bi_io_vec[0].bv_offset += voff;
1003         if (vcnt > 1) {
1004                 bio->bi_io_vec[0].bv_len -= voff;
1005                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1006         } else {
1007                 bio->bi_io_vec[0].bv_len = len;
1008         }
1009
1010         bio->bi_vcnt = vcnt;
1011         bio->bi_size = len;
1012         bio->bi_idx = 0;
1013
1014         return bio;
1015 }
1016
1017 /*
1018  * Clone a portion of a bio chain, starting at the given byte offset
1019  * into the first bio in the source chain and continuing for the
1020  * number of bytes indicated.  The result is another bio chain of
1021  * exactly the given length, or a null pointer on error.
1022  *
1023  * The bio_src and offset parameters are both in-out.  On entry they
1024  * refer to the first source bio and the offset into that bio where
1025  * the start of data to be cloned is located.
1026  *
1027  * On return, bio_src is updated to refer to the bio in the source
1028  * chain that contains first un-cloned byte, and *offset will
1029  * contain the offset of that byte within that bio.
1030  */
1031 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1032                                         unsigned int *offset,
1033                                         unsigned int len,
1034                                         gfp_t gfpmask)
1035 {
1036         struct bio *bi = *bio_src;
1037         unsigned int off = *offset;
1038         struct bio *chain = NULL;
1039         struct bio **end;
1040
1041         /* Build up a chain of clone bios up to the limit */
1042
1043         if (!bi || off >= bi->bi_size || !len)
1044                 return NULL;            /* Nothing to clone */
1045
1046         end = &chain;
1047         while (len) {
1048                 unsigned int bi_size;
1049                 struct bio *bio;
1050
1051                 if (!bi) {
1052                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1053                         goto out_err;   /* EINVAL; ran out of bio's */
1054                 }
1055                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1056                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1057                 if (!bio)
1058                         goto out_err;   /* ENOMEM */
1059
1060                 *end = bio;
1061                 end = &bio->bi_next;
1062
1063                 off += bi_size;
1064                 if (off == bi->bi_size) {
1065                         bi = bi->bi_next;
1066                         off = 0;
1067                 }
1068                 len -= bi_size;
1069         }
1070         *bio_src = bi;
1071         *offset = off;
1072
1073         return chain;
1074 out_err:
1075         bio_chain_put(chain);
1076
1077         return NULL;
1078 }
1079
1080 /*
1081  * The default/initial value for all object request flags is 0.  For
1082  * each flag, once its value is set to 1 it is never reset to 0
1083  * again.
1084  */
1085 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1086 {
1087         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1088                 struct rbd_img_request *img_request = obj_request->img_request;
1089                 struct rbd_device *rbd_dev;
1090
1091                 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1092                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1093                         obj_request);
1094         }
1095 }
1096
1097 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1098 {
1099         smp_mb();
1100         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1101 }
1102
1103 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1104 {
1105         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1106                 struct rbd_img_request *img_request = obj_request->img_request;
1107                 struct rbd_device *rbd_dev;
1108
1109                 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1110                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1111                         obj_request);
1112         }
1113 }
1114
1115 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1116 {
1117         smp_mb();
1118         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1119 }
1120
1121 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1122 {
1123         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1124                 atomic_read(&obj_request->kref.refcount));
1125         kref_get(&obj_request->kref);
1126 }
1127
1128 static void rbd_obj_request_destroy(struct kref *kref);
1129 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1130 {
1131         rbd_assert(obj_request != NULL);
1132         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1133                 atomic_read(&obj_request->kref.refcount));
1134         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1135 }
1136
1137 static void rbd_img_request_get(struct rbd_img_request *img_request)
1138 {
1139         dout("%s: img %p (was %d)\n", __func__, img_request,
1140                 atomic_read(&img_request->kref.refcount));
1141         kref_get(&img_request->kref);
1142 }
1143
1144 static void rbd_img_request_destroy(struct kref *kref);
1145 static void rbd_img_request_put(struct rbd_img_request *img_request)
1146 {
1147         rbd_assert(img_request != NULL);
1148         dout("%s: img %p (was %d)\n", __func__, img_request,
1149                 atomic_read(&img_request->kref.refcount));
1150         kref_put(&img_request->kref, rbd_img_request_destroy);
1151 }
1152
1153 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1154                                         struct rbd_obj_request *obj_request)
1155 {
1156         rbd_assert(obj_request->img_request == NULL);
1157
1158         rbd_obj_request_get(obj_request);
1159         obj_request->img_request = img_request;
1160         obj_request->which = img_request->obj_request_count;
1161         rbd_assert(!obj_request_img_data_test(obj_request));
1162         obj_request_img_data_set(obj_request);
1163         rbd_assert(obj_request->which != BAD_WHICH);
1164         img_request->obj_request_count++;
1165         list_add_tail(&obj_request->links, &img_request->obj_requests);
1166         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1167                 obj_request->which);
1168 }
1169
1170 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1171                                         struct rbd_obj_request *obj_request)
1172 {
1173         rbd_assert(obj_request->which != BAD_WHICH);
1174
1175         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1176                 obj_request->which);
1177         list_del(&obj_request->links);
1178         rbd_assert(img_request->obj_request_count > 0);
1179         img_request->obj_request_count--;
1180         rbd_assert(obj_request->which == img_request->obj_request_count);
1181         obj_request->which = BAD_WHICH;
1182         rbd_assert(obj_request_img_data_test(obj_request));
1183         rbd_assert(obj_request->img_request == img_request);
1184         obj_request->img_request = NULL;
1185         obj_request->callback = NULL;
1186         rbd_obj_request_put(obj_request);
1187 }
1188
1189 static bool obj_request_type_valid(enum obj_request_type type)
1190 {
1191         switch (type) {
1192         case OBJ_REQUEST_NODATA:
1193         case OBJ_REQUEST_BIO:
1194         case OBJ_REQUEST_PAGES:
1195                 return true;
1196         default:
1197                 return false;
1198         }
1199 }
1200
1201 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1202                                 struct rbd_obj_request *obj_request)
1203 {
1204         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1205
1206         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1207 }
1208
1209 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1210 {
1211
1212         dout("%s: img %p\n", __func__, img_request);
1213
1214         /*
1215          * If no error occurred, compute the aggregate transfer
1216          * count for the image request.  We could instead use
1217          * atomic64_cmpxchg() to update it as each object request
1218          * completes; not clear which way is better off hand.
1219          */
1220         if (!img_request->result) {
1221                 struct rbd_obj_request *obj_request;
1222                 u64 xferred = 0;
1223
1224                 for_each_obj_request(img_request, obj_request)
1225                         xferred += obj_request->xferred;
1226                 img_request->xferred = xferred;
1227         }
1228
1229         if (img_request->callback)
1230                 img_request->callback(img_request);
1231         else
1232                 rbd_img_request_put(img_request);
1233 }
1234
1235 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1236
1237 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1238 {
1239         dout("%s: obj %p\n", __func__, obj_request);
1240
1241         return wait_for_completion_interruptible(&obj_request->completion);
1242 }
1243
1244 /*
1245  * The default/initial value for all image request flags is 0.  Each
1246  * is conditionally set to 1 at image request initialization time
1247  * and currently never change thereafter.
1248  */
1249 static void img_request_write_set(struct rbd_img_request *img_request)
1250 {
1251         set_bit(IMG_REQ_WRITE, &img_request->flags);
1252         smp_mb();
1253 }
1254
1255 static bool img_request_write_test(struct rbd_img_request *img_request)
1256 {
1257         smp_mb();
1258         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1259 }
1260
1261 static void img_request_child_set(struct rbd_img_request *img_request)
1262 {
1263         set_bit(IMG_REQ_CHILD, &img_request->flags);
1264         smp_mb();
1265 }
1266
1267 static bool img_request_child_test(struct rbd_img_request *img_request)
1268 {
1269         smp_mb();
1270         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1271 }
1272
1273 static void img_request_layered_set(struct rbd_img_request *img_request)
1274 {
1275         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1276         smp_mb();
1277 }
1278
1279 static bool img_request_layered_test(struct rbd_img_request *img_request)
1280 {
1281         smp_mb();
1282         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1283 }
1284
1285 static void
1286 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1287 {
1288         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1289                 obj_request, obj_request->img_request, obj_request->result,
1290                 obj_request->xferred, obj_request->length);
1291         /*
1292          * ENOENT means a hole in the image.  We zero-fill the
1293          * entire length of the request.  A short read also implies
1294          * zero-fill to the end of the request.  Either way we
1295          * update the xferred count to indicate the whole request
1296          * was satisfied.
1297          */
1298         BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1299         if (obj_request->result == -ENOENT) {
1300                 zero_bio_chain(obj_request->bio_list, 0);
1301                 obj_request->result = 0;
1302                 obj_request->xferred = obj_request->length;
1303         } else if (obj_request->xferred < obj_request->length &&
1304                         !obj_request->result) {
1305                 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1306                 obj_request->xferred = obj_request->length;
1307         }
1308         obj_request_done_set(obj_request);
1309 }
1310
1311 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1312 {
1313         dout("%s: obj %p cb %p\n", __func__, obj_request,
1314                 obj_request->callback);
1315         if (obj_request->callback)
1316                 obj_request->callback(obj_request);
1317         else
1318                 complete_all(&obj_request->completion);
1319 }
1320
1321 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1322 {
1323         dout("%s: obj %p\n", __func__, obj_request);
1324         obj_request_done_set(obj_request);
1325 }
1326
1327 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1328 {
1329         dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1330                 obj_request->result, obj_request->xferred, obj_request->length);
1331         if (obj_request->img_request)
1332                 rbd_img_obj_request_read_callback(obj_request);
1333         else
1334                 obj_request_done_set(obj_request);
1335 }
1336
1337 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1338 {
1339         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1340                 obj_request->result, obj_request->length);
1341         /*
1342          * There is no such thing as a successful short write.
1343          * Our xferred value is the number of bytes transferred
1344          * back.  Set it to our originally-requested length.
1345          */
1346         obj_request->xferred = obj_request->length;
1347         obj_request_done_set(obj_request);
1348 }
1349
1350 /*
1351  * For a simple stat call there's nothing to do.  We'll do more if
1352  * this is part of a write sequence for a layered image.
1353  */
1354 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1355 {
1356         dout("%s: obj %p\n", __func__, obj_request);
1357         obj_request_done_set(obj_request);
1358 }
1359
1360 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1361                                 struct ceph_msg *msg)
1362 {
1363         struct rbd_obj_request *obj_request = osd_req->r_priv;
1364         u16 opcode;
1365
1366         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1367         rbd_assert(osd_req == obj_request->osd_req);
1368         rbd_assert(obj_request_img_data_test(obj_request) ^
1369                                 !obj_request->img_request);
1370         rbd_assert(obj_request_img_data_test(obj_request) ^
1371                                 (obj_request->which == BAD_WHICH));
1372
1373         if (osd_req->r_result < 0)
1374                 obj_request->result = osd_req->r_result;
1375         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1376
1377         WARN_ON(osd_req->r_num_ops != 1);       /* For now */
1378
1379         /*
1380          * We support a 64-bit length, but ultimately it has to be
1381          * passed to blk_end_request(), which takes an unsigned int.
1382          */
1383         obj_request->xferred = osd_req->r_reply_op_len[0];
1384         rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1385         opcode = osd_req->r_ops[0].op;
1386         switch (opcode) {
1387         case CEPH_OSD_OP_READ:
1388                 rbd_osd_read_callback(obj_request);
1389                 break;
1390         case CEPH_OSD_OP_WRITE:
1391                 rbd_osd_write_callback(obj_request);
1392                 break;
1393         case CEPH_OSD_OP_STAT:
1394                 rbd_osd_stat_callback(obj_request);
1395                 break;
1396         case CEPH_OSD_OP_CALL:
1397         case CEPH_OSD_OP_NOTIFY_ACK:
1398         case CEPH_OSD_OP_WATCH:
1399                 rbd_osd_trivial_callback(obj_request);
1400                 break;
1401         default:
1402                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1403                         obj_request->object_name, (unsigned short) opcode);
1404                 break;
1405         }
1406
1407         if (obj_request_done_test(obj_request))
1408                 rbd_obj_request_complete(obj_request);
1409 }
1410
1411 static void rbd_osd_req_format(struct rbd_obj_request *obj_request,
1412                                         bool write_request)
1413 {
1414         struct rbd_img_request *img_request = obj_request->img_request;
1415         struct ceph_osd_request *osd_req = obj_request->osd_req;
1416         struct ceph_snap_context *snapc = NULL;
1417         u64 snap_id = CEPH_NOSNAP;
1418         struct timespec *mtime = NULL;
1419         struct timespec now;
1420
1421         rbd_assert(osd_req != NULL);
1422
1423         if (write_request) {
1424                 now = CURRENT_TIME;
1425                 mtime = &now;
1426                 if (img_request)
1427                         snapc = img_request->snapc;
1428         } else if (img_request) {
1429                 snap_id = img_request->snap_id;
1430         }
1431         ceph_osdc_build_request(osd_req, obj_request->offset,
1432                         snapc, snap_id, mtime);
1433 }
1434
1435 static struct ceph_osd_request *rbd_osd_req_create(
1436                                         struct rbd_device *rbd_dev,
1437                                         bool write_request,
1438                                         struct rbd_obj_request *obj_request)
1439 {
1440         struct ceph_snap_context *snapc = NULL;
1441         struct ceph_osd_client *osdc;
1442         struct ceph_osd_request *osd_req;
1443
1444         if (obj_request_img_data_test(obj_request)) {
1445                 struct rbd_img_request *img_request = obj_request->img_request;
1446
1447                 rbd_assert(write_request ==
1448                                 img_request_write_test(img_request));
1449                 if (write_request)
1450                         snapc = img_request->snapc;
1451         }
1452
1453         /* Allocate and initialize the request, for the single op */
1454
1455         osdc = &rbd_dev->rbd_client->client->osdc;
1456         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1457         if (!osd_req)
1458                 return NULL;    /* ENOMEM */
1459
1460         if (write_request)
1461                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1462         else
1463                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1464
1465         osd_req->r_callback = rbd_osd_req_callback;
1466         osd_req->r_priv = obj_request;
1467
1468         osd_req->r_oid_len = strlen(obj_request->object_name);
1469         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1470         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1471
1472         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1473
1474         return osd_req;
1475 }
1476
1477 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1478 {
1479         ceph_osdc_put_request(osd_req);
1480 }
1481
1482 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1483
1484 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1485                                                 u64 offset, u64 length,
1486                                                 enum obj_request_type type)
1487 {
1488         struct rbd_obj_request *obj_request;
1489         size_t size;
1490         char *name;
1491
1492         rbd_assert(obj_request_type_valid(type));
1493
1494         size = strlen(object_name) + 1;
1495         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1496         if (!obj_request)
1497                 return NULL;
1498
1499         name = (char *)(obj_request + 1);
1500         obj_request->object_name = memcpy(name, object_name, size);
1501         obj_request->offset = offset;
1502         obj_request->length = length;
1503         obj_request->flags = 0;
1504         obj_request->which = BAD_WHICH;
1505         obj_request->type = type;
1506         INIT_LIST_HEAD(&obj_request->links);
1507         init_completion(&obj_request->completion);
1508         kref_init(&obj_request->kref);
1509
1510         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1511                 offset, length, (int)type, obj_request);
1512
1513         return obj_request;
1514 }
1515
1516 static void rbd_obj_request_destroy(struct kref *kref)
1517 {
1518         struct rbd_obj_request *obj_request;
1519
1520         obj_request = container_of(kref, struct rbd_obj_request, kref);
1521
1522         dout("%s: obj %p\n", __func__, obj_request);
1523
1524         rbd_assert(obj_request->img_request == NULL);
1525         rbd_assert(obj_request->which == BAD_WHICH);
1526
1527         if (obj_request->osd_req)
1528                 rbd_osd_req_destroy(obj_request->osd_req);
1529
1530         rbd_assert(obj_request_type_valid(obj_request->type));
1531         switch (obj_request->type) {
1532         case OBJ_REQUEST_NODATA:
1533                 break;          /* Nothing to do */
1534         case OBJ_REQUEST_BIO:
1535                 if (obj_request->bio_list)
1536                         bio_chain_put(obj_request->bio_list);
1537                 break;
1538         case OBJ_REQUEST_PAGES:
1539                 if (obj_request->pages)
1540                         ceph_release_page_vector(obj_request->pages,
1541                                                 obj_request->page_count);
1542                 break;
1543         }
1544
1545         kfree(obj_request);
1546 }
1547
1548 /*
1549  * Caller is responsible for filling in the list of object requests
1550  * that comprises the image request, and the Linux request pointer
1551  * (if there is one).
1552  */
1553 static struct rbd_img_request *rbd_img_request_create(
1554                                         struct rbd_device *rbd_dev,
1555                                         u64 offset, u64 length,
1556                                         bool write_request,
1557                                         bool child_request)
1558 {
1559         struct rbd_img_request *img_request;
1560         struct ceph_snap_context *snapc = NULL;
1561
1562         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1563         if (!img_request)
1564                 return NULL;
1565
1566         if (write_request) {
1567                 down_read(&rbd_dev->header_rwsem);
1568                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1569                 up_read(&rbd_dev->header_rwsem);
1570                 if (WARN_ON(!snapc)) {
1571                         kfree(img_request);
1572                         return NULL;    /* Shouldn't happen */
1573                 }
1574
1575         }
1576
1577         img_request->rq = NULL;
1578         img_request->rbd_dev = rbd_dev;
1579         img_request->offset = offset;
1580         img_request->length = length;
1581         img_request->flags = 0;
1582         if (write_request) {
1583                 img_request_write_set(img_request);
1584                 img_request->snapc = snapc;
1585         } else {
1586                 img_request->snap_id = rbd_dev->spec->snap_id;
1587         }
1588         if (child_request)
1589                 img_request_child_set(img_request);
1590         if (rbd_dev->parent_spec)
1591                 img_request_layered_set(img_request);
1592         spin_lock_init(&img_request->completion_lock);
1593         img_request->next_completion = 0;
1594         img_request->callback = NULL;
1595         img_request->result = 0;
1596         img_request->obj_request_count = 0;
1597         INIT_LIST_HEAD(&img_request->obj_requests);
1598         kref_init(&img_request->kref);
1599
1600         (void) img_request_layered_test(img_request);   /* Avoid a warning */
1601         rbd_img_request_get(img_request);       /* Avoid a warning */
1602         rbd_img_request_put(img_request);       /* TEMPORARY */
1603
1604         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1605                 write_request ? "write" : "read", offset, length,
1606                 img_request);
1607
1608         return img_request;
1609 }
1610
1611 static void rbd_img_request_destroy(struct kref *kref)
1612 {
1613         struct rbd_img_request *img_request;
1614         struct rbd_obj_request *obj_request;
1615         struct rbd_obj_request *next_obj_request;
1616
1617         img_request = container_of(kref, struct rbd_img_request, kref);
1618
1619         dout("%s: img %p\n", __func__, img_request);
1620
1621         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1622                 rbd_img_obj_request_del(img_request, obj_request);
1623         rbd_assert(img_request->obj_request_count == 0);
1624
1625         if (img_request_write_test(img_request))
1626                 ceph_put_snap_context(img_request->snapc);
1627
1628         kfree(img_request);
1629 }
1630
1631 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1632 {
1633         struct rbd_img_request *img_request;
1634         unsigned int xferred;
1635         int result;
1636
1637         rbd_assert(obj_request_img_data_test(obj_request));
1638         img_request = obj_request->img_request;
1639
1640         rbd_assert(!img_request_child_test(img_request));
1641         rbd_assert(img_request->rq != NULL);
1642
1643         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1644         xferred = (unsigned int)obj_request->xferred;
1645         result = obj_request->result;
1646         if (result) {
1647                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1648
1649                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1650                         img_request_write_test(img_request) ? "write" : "read",
1651                         obj_request->length, obj_request->img_offset,
1652                         obj_request->offset);
1653                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1654                         result, xferred);
1655                 if (!img_request->result)
1656                         img_request->result = result;
1657         }
1658
1659         return blk_end_request(img_request->rq, result, xferred);
1660 }
1661
1662 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1663 {
1664         struct rbd_img_request *img_request;
1665         u32 which = obj_request->which;
1666         bool more = true;
1667
1668         rbd_assert(obj_request_img_data_test(obj_request));
1669         img_request = obj_request->img_request;
1670
1671         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1672         rbd_assert(img_request != NULL);
1673         rbd_assert(img_request->obj_request_count > 0);
1674         rbd_assert(which != BAD_WHICH);
1675         rbd_assert(which < img_request->obj_request_count);
1676         rbd_assert(which >= img_request->next_completion);
1677
1678         spin_lock_irq(&img_request->completion_lock);
1679         if (which != img_request->next_completion)
1680                 goto out;
1681
1682         for_each_obj_request_from(img_request, obj_request) {
1683                 rbd_assert(more);
1684                 rbd_assert(which < img_request->obj_request_count);
1685
1686                 if (!obj_request_done_test(obj_request))
1687                         break;
1688                 more = rbd_img_obj_end_request(obj_request);
1689                 which++;
1690         }
1691
1692         rbd_assert(more ^ (which == img_request->obj_request_count));
1693         img_request->next_completion = which;
1694 out:
1695         spin_unlock_irq(&img_request->completion_lock);
1696
1697         if (!more)
1698                 rbd_img_request_complete(img_request);
1699 }
1700
1701 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1702                                         struct bio *bio_list)
1703 {
1704         struct rbd_device *rbd_dev = img_request->rbd_dev;
1705         struct rbd_obj_request *obj_request = NULL;
1706         struct rbd_obj_request *next_obj_request;
1707         bool write_request = img_request_write_test(img_request);
1708         unsigned int bio_offset;
1709         u64 img_offset;
1710         u64 resid;
1711         u16 opcode;
1712
1713         dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1714
1715         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1716         bio_offset = 0;
1717         img_offset = img_request->offset;
1718         rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1719         resid = img_request->length;
1720         rbd_assert(resid > 0);
1721         while (resid) {
1722                 struct ceph_osd_request *osd_req;
1723                 const char *object_name;
1724                 unsigned int clone_size;
1725                 u64 offset;
1726                 u64 length;
1727
1728                 object_name = rbd_segment_name(rbd_dev, img_offset);
1729                 if (!object_name)
1730                         goto out_unwind;
1731                 offset = rbd_segment_offset(rbd_dev, img_offset);
1732                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1733                 obj_request = rbd_obj_request_create(object_name,
1734                                                 offset, length,
1735                                                 OBJ_REQUEST_BIO);
1736                 kfree(object_name);     /* object request has its own copy */
1737                 if (!obj_request)
1738                         goto out_unwind;
1739
1740                 rbd_assert(length <= (u64) UINT_MAX);
1741                 clone_size = (unsigned int) length;
1742                 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1743                                                 &bio_offset, clone_size,
1744                                                 GFP_ATOMIC);
1745                 if (!obj_request->bio_list)
1746                         goto out_partial;
1747
1748                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1749                                                 obj_request);
1750                 if (!osd_req)
1751                         goto out_partial;
1752                 obj_request->osd_req = osd_req;
1753                 obj_request->callback = rbd_img_obj_callback;
1754
1755                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1756                                                 0, 0);
1757                 osd_req_op_extent_osd_data_bio(osd_req, 0, write_request,
1758                                 obj_request->bio_list, obj_request->length);
1759                 rbd_osd_req_format(obj_request, write_request);
1760
1761                 obj_request->img_offset = img_offset;
1762                 rbd_img_obj_request_add(img_request, obj_request);
1763
1764                 img_offset += length;
1765                 resid -= length;
1766         }
1767
1768         return 0;
1769
1770 out_partial:
1771         rbd_obj_request_put(obj_request);
1772 out_unwind:
1773         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1774                 rbd_obj_request_put(obj_request);
1775
1776         return -ENOMEM;
1777 }
1778
1779 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1780 {
1781         struct rbd_device *rbd_dev = img_request->rbd_dev;
1782         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1783         struct rbd_obj_request *obj_request;
1784         struct rbd_obj_request *next_obj_request;
1785
1786         dout("%s: img %p\n", __func__, img_request);
1787         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
1788                 int ret;
1789
1790                 ret = rbd_obj_request_submit(osdc, obj_request);
1791                 if (ret)
1792                         return ret;
1793                 /*
1794                  * The image request has its own reference to each
1795                  * of its object requests, so we can safely drop the
1796                  * initial one here.
1797                  */
1798                 rbd_obj_request_put(obj_request);
1799         }
1800
1801         return 0;
1802 }
1803
1804 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1805                                    u64 ver, u64 notify_id)
1806 {
1807         struct rbd_obj_request *obj_request;
1808         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1809         int ret;
1810
1811         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1812                                                         OBJ_REQUEST_NODATA);
1813         if (!obj_request)
1814                 return -ENOMEM;
1815
1816         ret = -ENOMEM;
1817         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1818         if (!obj_request->osd_req)
1819                 goto out;
1820         obj_request->callback = rbd_obj_request_put;
1821
1822         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
1823                                         notify_id, ver, 0);
1824         rbd_osd_req_format(obj_request, false);
1825
1826         ret = rbd_obj_request_submit(osdc, obj_request);
1827 out:
1828         if (ret)
1829                 rbd_obj_request_put(obj_request);
1830
1831         return ret;
1832 }
1833
1834 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1835 {
1836         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1837         u64 hver;
1838         int rc;
1839
1840         if (!rbd_dev)
1841                 return;
1842
1843         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1844                 rbd_dev->header_name, (unsigned long long) notify_id,
1845                 (unsigned int) opcode);
1846         rc = rbd_dev_refresh(rbd_dev, &hver);
1847         if (rc)
1848                 rbd_warn(rbd_dev, "got notification but failed to "
1849                            " update snaps: %d\n", rc);
1850
1851         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1852 }
1853
1854 /*
1855  * Request sync osd watch/unwatch.  The value of "start" determines
1856  * whether a watch request is being initiated or torn down.
1857  */
1858 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1859 {
1860         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1861         struct rbd_obj_request *obj_request;
1862         int ret;
1863
1864         rbd_assert(start ^ !!rbd_dev->watch_event);
1865         rbd_assert(start ^ !!rbd_dev->watch_request);
1866
1867         if (start) {
1868                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1869                                                 &rbd_dev->watch_event);
1870                 if (ret < 0)
1871                         return ret;
1872                 rbd_assert(rbd_dev->watch_event != NULL);
1873         }
1874
1875         ret = -ENOMEM;
1876         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1877                                                         OBJ_REQUEST_NODATA);
1878         if (!obj_request)
1879                 goto out_cancel;
1880
1881         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
1882         if (!obj_request->osd_req)
1883                 goto out_cancel;
1884
1885         if (start)
1886                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1887         else
1888                 ceph_osdc_unregister_linger_request(osdc,
1889                                         rbd_dev->watch_request->osd_req);
1890
1891         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1892                                 rbd_dev->watch_event->cookie,
1893                                 rbd_dev->header.obj_version, start);
1894         rbd_osd_req_format(obj_request, true);
1895
1896         ret = rbd_obj_request_submit(osdc, obj_request);
1897         if (ret)
1898                 goto out_cancel;
1899         ret = rbd_obj_request_wait(obj_request);
1900         if (ret)
1901                 goto out_cancel;
1902         ret = obj_request->result;
1903         if (ret)
1904                 goto out_cancel;
1905
1906         /*
1907          * A watch request is set to linger, so the underlying osd
1908          * request won't go away until we unregister it.  We retain
1909          * a pointer to the object request during that time (in
1910          * rbd_dev->watch_request), so we'll keep a reference to
1911          * it.  We'll drop that reference (below) after we've
1912          * unregistered it.
1913          */
1914         if (start) {
1915                 rbd_dev->watch_request = obj_request;
1916
1917                 return 0;
1918         }
1919
1920         /* We have successfully torn down the watch request */
1921
1922         rbd_obj_request_put(rbd_dev->watch_request);
1923         rbd_dev->watch_request = NULL;
1924 out_cancel:
1925         /* Cancel the event if we're tearing down, or on error */
1926         ceph_osdc_cancel_event(rbd_dev->watch_event);
1927         rbd_dev->watch_event = NULL;
1928         if (obj_request)
1929                 rbd_obj_request_put(obj_request);
1930
1931         return ret;
1932 }
1933
1934 /*
1935  * Synchronous osd object method call
1936  */
1937 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1938                              const char *object_name,
1939                              const char *class_name,
1940                              const char *method_name,
1941                              const char *outbound,
1942                              size_t outbound_size,
1943                              char *inbound,
1944                              size_t inbound_size,
1945                              u64 *version)
1946 {
1947         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1948         struct rbd_obj_request *obj_request;
1949         struct page **pages;
1950         u32 page_count;
1951         int ret;
1952
1953         /*
1954          * Method calls are ultimately read operations.  The result
1955          * should placed into the inbound buffer provided.  They
1956          * also supply outbound data--parameters for the object
1957          * method.  Currently if this is present it will be a
1958          * snapshot id.
1959          */
1960         page_count = (u32) calc_pages_for(0, inbound_size);
1961         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1962         if (IS_ERR(pages))
1963                 return PTR_ERR(pages);
1964
1965         ret = -ENOMEM;
1966         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
1967                                                         OBJ_REQUEST_PAGES);
1968         if (!obj_request)
1969                 goto out;
1970
1971         obj_request->pages = pages;
1972         obj_request->page_count = page_count;
1973
1974         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1975         if (!obj_request->osd_req)
1976                 goto out;
1977
1978         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
1979                                         class_name, method_name);
1980         if (outbound_size) {
1981                 struct ceph_pagelist *pagelist;
1982
1983                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
1984                 if (!pagelist)
1985                         goto out;
1986
1987                 ceph_pagelist_init(pagelist);
1988                 ceph_pagelist_append(pagelist, outbound, outbound_size);
1989                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
1990                                                 pagelist);
1991         }
1992         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
1993                                         obj_request->pages, inbound_size,
1994                                         0, false, false);
1995         rbd_osd_req_format(obj_request, false);
1996
1997         ret = rbd_obj_request_submit(osdc, obj_request);
1998         if (ret)
1999                 goto out;
2000         ret = rbd_obj_request_wait(obj_request);
2001         if (ret)
2002                 goto out;
2003
2004         ret = obj_request->result;
2005         if (ret < 0)
2006                 goto out;
2007         ret = 0;
2008         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2009         if (version)
2010                 *version = obj_request->version;
2011 out:
2012         if (obj_request)
2013                 rbd_obj_request_put(obj_request);
2014         else
2015                 ceph_release_page_vector(pages, page_count);
2016
2017         return ret;
2018 }
2019
2020 static void rbd_request_fn(struct request_queue *q)
2021                 __releases(q->queue_lock) __acquires(q->queue_lock)
2022 {
2023         struct rbd_device *rbd_dev = q->queuedata;
2024         bool read_only = rbd_dev->mapping.read_only;
2025         struct request *rq;
2026         int result;
2027
2028         while ((rq = blk_fetch_request(q))) {
2029                 bool write_request = rq_data_dir(rq) == WRITE;
2030                 struct rbd_img_request *img_request;
2031                 u64 offset;
2032                 u64 length;
2033
2034                 /* Ignore any non-FS requests that filter through. */
2035
2036                 if (rq->cmd_type != REQ_TYPE_FS) {
2037                         dout("%s: non-fs request type %d\n", __func__,
2038                                 (int) rq->cmd_type);
2039                         __blk_end_request_all(rq, 0);
2040                         continue;
2041                 }
2042
2043                 /* Ignore/skip any zero-length requests */
2044
2045                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2046                 length = (u64) blk_rq_bytes(rq);
2047
2048                 if (!length) {
2049                         dout("%s: zero-length request\n", __func__);
2050                         __blk_end_request_all(rq, 0);
2051                         continue;
2052                 }
2053
2054                 spin_unlock_irq(q->queue_lock);
2055
2056                 /* Disallow writes to a read-only device */
2057
2058                 if (write_request) {
2059                         result = -EROFS;
2060                         if (read_only)
2061                                 goto end_request;
2062                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2063                 }
2064
2065                 /*
2066                  * Quit early if the mapped snapshot no longer
2067                  * exists.  It's still possible the snapshot will
2068                  * have disappeared by the time our request arrives
2069                  * at the osd, but there's no sense in sending it if
2070                  * we already know.
2071                  */
2072                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2073                         dout("request for non-existent snapshot");
2074                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2075                         result = -ENXIO;
2076                         goto end_request;
2077                 }
2078
2079                 result = -EINVAL;
2080                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2081                         goto end_request;       /* Shouldn't happen */
2082
2083                 result = -ENOMEM;
2084                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2085                                                         write_request, false);
2086                 if (!img_request)
2087                         goto end_request;
2088
2089                 img_request->rq = rq;
2090
2091                 result = rbd_img_request_fill_bio(img_request, rq->bio);
2092                 if (!result)
2093                         result = rbd_img_request_submit(img_request);
2094                 if (result)
2095                         rbd_img_request_put(img_request);
2096 end_request:
2097                 spin_lock_irq(q->queue_lock);
2098                 if (result < 0) {
2099                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2100                                 write_request ? "write" : "read",
2101                                 length, offset, result);
2102
2103                         __blk_end_request_all(rq, result);
2104                 }
2105         }
2106 }
2107
2108 /*
2109  * a queue callback. Makes sure that we don't create a bio that spans across
2110  * multiple osd objects. One exception would be with a single page bios,
2111  * which we handle later at bio_chain_clone_range()
2112  */
2113 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2114                           struct bio_vec *bvec)
2115 {
2116         struct rbd_device *rbd_dev = q->queuedata;
2117         sector_t sector_offset;
2118         sector_t sectors_per_obj;
2119         sector_t obj_sector_offset;
2120         int ret;
2121
2122         /*
2123          * Find how far into its rbd object the partition-relative
2124          * bio start sector is to offset relative to the enclosing
2125          * device.
2126          */
2127         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2128         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2129         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2130
2131         /*
2132          * Compute the number of bytes from that offset to the end
2133          * of the object.  Account for what's already used by the bio.
2134          */
2135         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2136         if (ret > bmd->bi_size)
2137                 ret -= bmd->bi_size;
2138         else
2139                 ret = 0;
2140
2141         /*
2142          * Don't send back more than was asked for.  And if the bio
2143          * was empty, let the whole thing through because:  "Note
2144          * that a block device *must* allow a single page to be
2145          * added to an empty bio."
2146          */
2147         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2148         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2149                 ret = (int) bvec->bv_len;
2150
2151         return ret;
2152 }
2153
2154 static void rbd_free_disk(struct rbd_device *rbd_dev)
2155 {
2156         struct gendisk *disk = rbd_dev->disk;
2157
2158         if (!disk)
2159                 return;
2160
2161         if (disk->flags & GENHD_FL_UP)
2162                 del_gendisk(disk);
2163         if (disk->queue)
2164                 blk_cleanup_queue(disk->queue);
2165         put_disk(disk);
2166 }
2167
2168 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2169                                 const char *object_name,
2170                                 u64 offset, u64 length,
2171                                 char *buf, u64 *version)
2172
2173 {
2174         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2175         struct rbd_obj_request *obj_request;
2176         struct page **pages = NULL;
2177         u32 page_count;
2178         size_t size;
2179         int ret;
2180
2181         page_count = (u32) calc_pages_for(offset, length);
2182         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2183         if (IS_ERR(pages))
2184                 ret = PTR_ERR(pages);
2185
2186         ret = -ENOMEM;
2187         obj_request = rbd_obj_request_create(object_name, offset, length,
2188                                                         OBJ_REQUEST_PAGES);
2189         if (!obj_request)
2190                 goto out;
2191
2192         obj_request->pages = pages;
2193         obj_request->page_count = page_count;
2194
2195         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2196         if (!obj_request->osd_req)
2197                 goto out;
2198
2199         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2200                                         offset, length, 0, 0);
2201         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, false,
2202                                         obj_request->pages,
2203                                         obj_request->length,
2204                                         obj_request->offset & ~PAGE_MASK,
2205                                         false, false);
2206         rbd_osd_req_format(obj_request, false);
2207
2208         ret = rbd_obj_request_submit(osdc, obj_request);
2209         if (ret)
2210                 goto out;
2211         ret = rbd_obj_request_wait(obj_request);
2212         if (ret)
2213                 goto out;
2214
2215         ret = obj_request->result;
2216         if (ret < 0)
2217                 goto out;
2218
2219         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2220         size = (size_t) obj_request->xferred;
2221         ceph_copy_from_page_vector(pages, buf, 0, size);
2222         rbd_assert(size <= (size_t) INT_MAX);
2223         ret = (int) size;
2224         if (version)
2225                 *version = obj_request->version;
2226 out:
2227         if (obj_request)
2228                 rbd_obj_request_put(obj_request);
2229         else
2230                 ceph_release_page_vector(pages, page_count);
2231
2232         return ret;
2233 }
2234
2235 /*
2236  * Read the complete header for the given rbd device.
2237  *
2238  * Returns a pointer to a dynamically-allocated buffer containing
2239  * the complete and validated header.  Caller can pass the address
2240  * of a variable that will be filled in with the version of the
2241  * header object at the time it was read.
2242  *
2243  * Returns a pointer-coded errno if a failure occurs.
2244  */
2245 static struct rbd_image_header_ondisk *
2246 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2247 {
2248         struct rbd_image_header_ondisk *ondisk = NULL;
2249         u32 snap_count = 0;
2250         u64 names_size = 0;
2251         u32 want_count;
2252         int ret;
2253
2254         /*
2255          * The complete header will include an array of its 64-bit
2256          * snapshot ids, followed by the names of those snapshots as
2257          * a contiguous block of NUL-terminated strings.  Note that
2258          * the number of snapshots could change by the time we read
2259          * it in, in which case we re-read it.
2260          */
2261         do {
2262                 size_t size;
2263
2264                 kfree(ondisk);
2265
2266                 size = sizeof (*ondisk);
2267                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2268                 size += names_size;
2269                 ondisk = kmalloc(size, GFP_KERNEL);
2270                 if (!ondisk)
2271                         return ERR_PTR(-ENOMEM);
2272
2273                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2274                                        0, size,
2275                                        (char *) ondisk, version);
2276                 if (ret < 0)
2277                         goto out_err;
2278                 if (WARN_ON((size_t) ret < size)) {
2279                         ret = -ENXIO;
2280                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2281                                 size, ret);
2282                         goto out_err;
2283                 }
2284                 if (!rbd_dev_ondisk_valid(ondisk)) {
2285                         ret = -ENXIO;
2286                         rbd_warn(rbd_dev, "invalid header");
2287                         goto out_err;
2288                 }
2289
2290                 names_size = le64_to_cpu(ondisk->snap_names_len);
2291                 want_count = snap_count;
2292                 snap_count = le32_to_cpu(ondisk->snap_count);
2293         } while (snap_count != want_count);
2294
2295         return ondisk;
2296
2297 out_err:
2298         kfree(ondisk);
2299
2300         return ERR_PTR(ret);
2301 }
2302
2303 /*
2304  * reload the ondisk the header
2305  */
2306 static int rbd_read_header(struct rbd_device *rbd_dev,
2307                            struct rbd_image_header *header)
2308 {
2309         struct rbd_image_header_ondisk *ondisk;
2310         u64 ver = 0;
2311         int ret;
2312
2313         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2314         if (IS_ERR(ondisk))
2315                 return PTR_ERR(ondisk);
2316         ret = rbd_header_from_disk(header, ondisk);
2317         if (ret >= 0)
2318                 header->obj_version = ver;
2319         kfree(ondisk);
2320
2321         return ret;
2322 }
2323
2324 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2325 {
2326         struct rbd_snap *snap;
2327         struct rbd_snap *next;
2328
2329         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2330                 rbd_remove_snap_dev(snap);
2331 }
2332
2333 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2334 {
2335         sector_t size;
2336
2337         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2338                 return;
2339
2340         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2341         dout("setting size to %llu sectors", (unsigned long long) size);
2342         rbd_dev->mapping.size = (u64) size;
2343         set_capacity(rbd_dev->disk, size);
2344 }
2345
2346 /*
2347  * only read the first part of the ondisk header, without the snaps info
2348  */
2349 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2350 {
2351         int ret;
2352         struct rbd_image_header h;
2353
2354         ret = rbd_read_header(rbd_dev, &h);
2355         if (ret < 0)
2356                 return ret;
2357
2358         down_write(&rbd_dev->header_rwsem);
2359
2360         /* Update image size, and check for resize of mapped image */
2361         rbd_dev->header.image_size = h.image_size;
2362         rbd_update_mapping_size(rbd_dev);
2363
2364         /* rbd_dev->header.object_prefix shouldn't change */
2365         kfree(rbd_dev->header.snap_sizes);
2366         kfree(rbd_dev->header.snap_names);
2367         /* osd requests may still refer to snapc */
2368         ceph_put_snap_context(rbd_dev->header.snapc);
2369
2370         if (hver)
2371                 *hver = h.obj_version;
2372         rbd_dev->header.obj_version = h.obj_version;
2373         rbd_dev->header.image_size = h.image_size;
2374         rbd_dev->header.snapc = h.snapc;
2375         rbd_dev->header.snap_names = h.snap_names;
2376         rbd_dev->header.snap_sizes = h.snap_sizes;
2377         /* Free the extra copy of the object prefix */
2378         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2379         kfree(h.object_prefix);
2380
2381         ret = rbd_dev_snaps_update(rbd_dev);
2382         if (!ret)
2383                 ret = rbd_dev_snaps_register(rbd_dev);
2384
2385         up_write(&rbd_dev->header_rwsem);
2386
2387         return ret;
2388 }
2389
2390 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2391 {
2392         int ret;
2393
2394         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2395         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2396         if (rbd_dev->image_format == 1)
2397                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2398         else
2399                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2400         mutex_unlock(&ctl_mutex);
2401
2402         return ret;
2403 }
2404
2405 static int rbd_init_disk(struct rbd_device *rbd_dev)
2406 {
2407         struct gendisk *disk;
2408         struct request_queue *q;
2409         u64 segment_size;
2410
2411         /* create gendisk info */
2412         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2413         if (!disk)
2414                 return -ENOMEM;
2415
2416         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2417                  rbd_dev->dev_id);
2418         disk->major = rbd_dev->major;
2419         disk->first_minor = 0;
2420         disk->fops = &rbd_bd_ops;
2421         disk->private_data = rbd_dev;
2422
2423         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2424         if (!q)
2425                 goto out_disk;
2426
2427         /* We use the default size, but let's be explicit about it. */
2428         blk_queue_physical_block_size(q, SECTOR_SIZE);
2429
2430         /* set io sizes to object size */
2431         segment_size = rbd_obj_bytes(&rbd_dev->header);
2432         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2433         blk_queue_max_segment_size(q, segment_size);
2434         blk_queue_io_min(q, segment_size);
2435         blk_queue_io_opt(q, segment_size);
2436
2437         blk_queue_merge_bvec(q, rbd_merge_bvec);
2438         disk->queue = q;
2439
2440         q->queuedata = rbd_dev;
2441
2442         rbd_dev->disk = disk;
2443
2444         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2445
2446         return 0;
2447 out_disk:
2448         put_disk(disk);
2449
2450         return -ENOMEM;
2451 }
2452
2453 /*
2454   sysfs
2455 */
2456
2457 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2458 {
2459         return container_of(dev, struct rbd_device, dev);
2460 }
2461
2462 static ssize_t rbd_size_show(struct device *dev,
2463                              struct device_attribute *attr, char *buf)
2464 {
2465         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2466         sector_t size;
2467
2468         down_read(&rbd_dev->header_rwsem);
2469         size = get_capacity(rbd_dev->disk);
2470         up_read(&rbd_dev->header_rwsem);
2471
2472         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2473 }
2474
2475 /*
2476  * Note this shows the features for whatever's mapped, which is not
2477  * necessarily the base image.
2478  */
2479 static ssize_t rbd_features_show(struct device *dev,
2480                              struct device_attribute *attr, char *buf)
2481 {
2482         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2483
2484         return sprintf(buf, "0x%016llx\n",
2485                         (unsigned long long) rbd_dev->mapping.features);
2486 }
2487
2488 static ssize_t rbd_major_show(struct device *dev,
2489                               struct device_attribute *attr, char *buf)
2490 {
2491         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2492
2493         return sprintf(buf, "%d\n", rbd_dev->major);
2494 }
2495
2496 static ssize_t rbd_client_id_show(struct device *dev,
2497                                   struct device_attribute *attr, char *buf)
2498 {
2499         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2500
2501         return sprintf(buf, "client%lld\n",
2502                         ceph_client_id(rbd_dev->rbd_client->client));
2503 }
2504
2505 static ssize_t rbd_pool_show(struct device *dev,
2506                              struct device_attribute *attr, char *buf)
2507 {
2508         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2509
2510         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2511 }
2512
2513 static ssize_t rbd_pool_id_show(struct device *dev,
2514                              struct device_attribute *attr, char *buf)
2515 {
2516         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2517
2518         return sprintf(buf, "%llu\n",
2519                 (unsigned long long) rbd_dev->spec->pool_id);
2520 }
2521
2522 static ssize_t rbd_name_show(struct device *dev,
2523                              struct device_attribute *attr, char *buf)
2524 {
2525         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2526
2527         if (rbd_dev->spec->image_name)
2528                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2529
2530         return sprintf(buf, "(unknown)\n");
2531 }
2532
2533 static ssize_t rbd_image_id_show(struct device *dev,
2534                              struct device_attribute *attr, char *buf)
2535 {
2536         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2537
2538         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2539 }
2540
2541 /*
2542  * Shows the name of the currently-mapped snapshot (or
2543  * RBD_SNAP_HEAD_NAME for the base image).
2544  */
2545 static ssize_t rbd_snap_show(struct device *dev,
2546                              struct device_attribute *attr,
2547                              char *buf)
2548 {
2549         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2550
2551         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2552 }
2553
2554 /*
2555  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2556  * for the parent image.  If there is no parent, simply shows
2557  * "(no parent image)".
2558  */
2559 static ssize_t rbd_parent_show(struct device *dev,
2560                              struct device_attribute *attr,
2561                              char *buf)
2562 {
2563         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2564         struct rbd_spec *spec = rbd_dev->parent_spec;
2565         int count;
2566         char *bufp = buf;
2567
2568         if (!spec)
2569                 return sprintf(buf, "(no parent image)\n");
2570
2571         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2572                         (unsigned long long) spec->pool_id, spec->pool_name);
2573         if (count < 0)
2574                 return count;
2575         bufp += count;
2576
2577         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2578                         spec->image_name ? spec->image_name : "(unknown)");
2579         if (count < 0)
2580                 return count;
2581         bufp += count;
2582
2583         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2584                         (unsigned long long) spec->snap_id, spec->snap_name);
2585         if (count < 0)
2586                 return count;
2587         bufp += count;
2588
2589         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2590         if (count < 0)
2591                 return count;
2592         bufp += count;
2593
2594         return (ssize_t) (bufp - buf);
2595 }
2596
2597 static ssize_t rbd_image_refresh(struct device *dev,
2598                                  struct device_attribute *attr,
2599                                  const char *buf,
2600                                  size_t size)
2601 {
2602         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2603         int ret;
2604
2605         ret = rbd_dev_refresh(rbd_dev, NULL);
2606
2607         return ret < 0 ? ret : size;
2608 }
2609
2610 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2611 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2612 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2613 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2614 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2615 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2616 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2617 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2618 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2619 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2620 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2621
2622 static struct attribute *rbd_attrs[] = {
2623         &dev_attr_size.attr,
2624         &dev_attr_features.attr,
2625         &dev_attr_major.attr,
2626         &dev_attr_client_id.attr,
2627         &dev_attr_pool.attr,
2628         &dev_attr_pool_id.attr,
2629         &dev_attr_name.attr,
2630         &dev_attr_image_id.attr,
2631         &dev_attr_current_snap.attr,
2632         &dev_attr_parent.attr,
2633         &dev_attr_refresh.attr,
2634         NULL
2635 };
2636
2637 static struct attribute_group rbd_attr_group = {
2638         .attrs = rbd_attrs,
2639 };
2640
2641 static const struct attribute_group *rbd_attr_groups[] = {
2642         &rbd_attr_group,
2643         NULL
2644 };
2645
2646 static void rbd_sysfs_dev_release(struct device *dev)
2647 {
2648 }
2649
2650 static struct device_type rbd_device_type = {
2651         .name           = "rbd",
2652         .groups         = rbd_attr_groups,
2653         .release        = rbd_sysfs_dev_release,
2654 };
2655
2656
2657 /*
2658   sysfs - snapshots
2659 */
2660
2661 static ssize_t rbd_snap_size_show(struct device *dev,
2662                                   struct device_attribute *attr,
2663                                   char *buf)
2664 {
2665         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2666
2667         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2668 }
2669
2670 static ssize_t rbd_snap_id_show(struct device *dev,
2671                                 struct device_attribute *attr,
2672                                 char *buf)
2673 {
2674         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2675
2676         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2677 }
2678
2679 static ssize_t rbd_snap_features_show(struct device *dev,
2680                                 struct device_attribute *attr,
2681                                 char *buf)
2682 {
2683         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2684
2685         return sprintf(buf, "0x%016llx\n",
2686                         (unsigned long long) snap->features);
2687 }
2688
2689 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2690 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2691 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2692
2693 static struct attribute *rbd_snap_attrs[] = {
2694         &dev_attr_snap_size.attr,
2695         &dev_attr_snap_id.attr,
2696         &dev_attr_snap_features.attr,
2697         NULL,
2698 };
2699
2700 static struct attribute_group rbd_snap_attr_group = {
2701         .attrs = rbd_snap_attrs,
2702 };
2703
2704 static void rbd_snap_dev_release(struct device *dev)
2705 {
2706         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2707         kfree(snap->name);
2708         kfree(snap);
2709 }
2710
2711 static const struct attribute_group *rbd_snap_attr_groups[] = {
2712         &rbd_snap_attr_group,
2713         NULL
2714 };
2715
2716 static struct device_type rbd_snap_device_type = {
2717         .groups         = rbd_snap_attr_groups,
2718         .release        = rbd_snap_dev_release,
2719 };
2720
2721 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2722 {
2723         kref_get(&spec->kref);
2724
2725         return spec;
2726 }
2727
2728 static void rbd_spec_free(struct kref *kref);
2729 static void rbd_spec_put(struct rbd_spec *spec)
2730 {
2731         if (spec)
2732                 kref_put(&spec->kref, rbd_spec_free);
2733 }
2734
2735 static struct rbd_spec *rbd_spec_alloc(void)
2736 {
2737         struct rbd_spec *spec;
2738
2739         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2740         if (!spec)
2741                 return NULL;
2742         kref_init(&spec->kref);
2743
2744         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2745
2746         return spec;
2747 }
2748
2749 static void rbd_spec_free(struct kref *kref)
2750 {
2751         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2752
2753         kfree(spec->pool_name);
2754         kfree(spec->image_id);
2755         kfree(spec->image_name);
2756         kfree(spec->snap_name);
2757         kfree(spec);
2758 }
2759
2760 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2761                                 struct rbd_spec *spec)
2762 {
2763         struct rbd_device *rbd_dev;
2764
2765         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2766         if (!rbd_dev)
2767                 return NULL;
2768
2769         spin_lock_init(&rbd_dev->lock);
2770         rbd_dev->flags = 0;
2771         INIT_LIST_HEAD(&rbd_dev->node);
2772         INIT_LIST_HEAD(&rbd_dev->snaps);
2773         init_rwsem(&rbd_dev->header_rwsem);
2774
2775         rbd_dev->spec = spec;
2776         rbd_dev->rbd_client = rbdc;
2777
2778         /* Initialize the layout used for all rbd requests */
2779
2780         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2781         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2782         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2783         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2784
2785         return rbd_dev;
2786 }
2787
2788 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2789 {
2790         rbd_spec_put(rbd_dev->parent_spec);
2791         kfree(rbd_dev->header_name);
2792         rbd_put_client(rbd_dev->rbd_client);
2793         rbd_spec_put(rbd_dev->spec);
2794         kfree(rbd_dev);
2795 }
2796
2797 static bool rbd_snap_registered(struct rbd_snap *snap)
2798 {
2799         bool ret = snap->dev.type == &rbd_snap_device_type;
2800         bool reg = device_is_registered(&snap->dev);
2801
2802         rbd_assert(!ret ^ reg);
2803
2804         return ret;
2805 }
2806
2807 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2808 {
2809         list_del(&snap->node);
2810         if (device_is_registered(&snap->dev))
2811                 device_unregister(&snap->dev);
2812 }
2813
2814 static int rbd_register_snap_dev(struct rbd_snap *snap,
2815                                   struct device *parent)
2816 {
2817         struct device *dev = &snap->dev;
2818         int ret;
2819
2820         dev->type = &rbd_snap_device_type;
2821         dev->parent = parent;
2822         dev->release = rbd_snap_dev_release;
2823         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2824         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2825
2826         ret = device_register(dev);
2827
2828         return ret;
2829 }
2830
2831 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2832                                                 const char *snap_name,
2833                                                 u64 snap_id, u64 snap_size,
2834                                                 u64 snap_features)
2835 {
2836         struct rbd_snap *snap;
2837         int ret;
2838
2839         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2840         if (!snap)
2841                 return ERR_PTR(-ENOMEM);
2842
2843         ret = -ENOMEM;
2844         snap->name = kstrdup(snap_name, GFP_KERNEL);
2845         if (!snap->name)
2846                 goto err;
2847
2848         snap->id = snap_id;
2849         snap->size = snap_size;
2850         snap->features = snap_features;
2851
2852         return snap;
2853
2854 err:
2855         kfree(snap->name);
2856         kfree(snap);
2857
2858         return ERR_PTR(ret);
2859 }
2860
2861 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2862                 u64 *snap_size, u64 *snap_features)
2863 {
2864         char *snap_name;
2865
2866         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2867
2868         *snap_size = rbd_dev->header.snap_sizes[which];
2869         *snap_features = 0;     /* No features for v1 */
2870
2871         /* Skip over names until we find the one we are looking for */
2872
2873         snap_name = rbd_dev->header.snap_names;
2874         while (which--)
2875                 snap_name += strlen(snap_name) + 1;
2876
2877         return snap_name;
2878 }
2879
2880 /*
2881  * Get the size and object order for an image snapshot, or if
2882  * snap_id is CEPH_NOSNAP, gets this information for the base
2883  * image.
2884  */
2885 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2886                                 u8 *order, u64 *snap_size)
2887 {
2888         __le64 snapid = cpu_to_le64(snap_id);
2889         int ret;
2890         struct {
2891                 u8 order;
2892                 __le64 size;
2893         } __attribute__ ((packed)) size_buf = { 0 };
2894
2895         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2896                                 "rbd", "get_size",
2897                                 (char *) &snapid, sizeof (snapid),
2898                                 (char *) &size_buf, sizeof (size_buf), NULL);
2899         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2900         if (ret < 0)
2901                 return ret;
2902
2903         *order = size_buf.order;
2904         *snap_size = le64_to_cpu(size_buf.size);
2905
2906         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2907                 (unsigned long long) snap_id, (unsigned int) *order,
2908                 (unsigned long long) *snap_size);
2909
2910         return 0;
2911 }
2912
2913 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2914 {
2915         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2916                                         &rbd_dev->header.obj_order,
2917                                         &rbd_dev->header.image_size);
2918 }
2919
2920 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2921 {
2922         void *reply_buf;
2923         int ret;
2924         void *p;
2925
2926         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2927         if (!reply_buf)
2928                 return -ENOMEM;
2929
2930         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2931                                 "rbd", "get_object_prefix",
2932                                 NULL, 0,
2933                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2934         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2935         if (ret < 0)
2936                 goto out;
2937
2938         p = reply_buf;
2939         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2940                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2941                                                 NULL, GFP_NOIO);
2942
2943         if (IS_ERR(rbd_dev->header.object_prefix)) {
2944                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2945                 rbd_dev->header.object_prefix = NULL;
2946         } else {
2947                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2948         }
2949
2950 out:
2951         kfree(reply_buf);
2952
2953         return ret;
2954 }
2955
2956 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2957                 u64 *snap_features)
2958 {
2959         __le64 snapid = cpu_to_le64(snap_id);
2960         struct {
2961                 __le64 features;
2962                 __le64 incompat;
2963         } features_buf = { 0 };
2964         u64 incompat;
2965         int ret;
2966
2967         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2968                                 "rbd", "get_features",
2969                                 (char *) &snapid, sizeof (snapid),
2970                                 (char *) &features_buf, sizeof (features_buf),
2971                                 NULL);
2972         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2973         if (ret < 0)
2974                 return ret;
2975
2976         incompat = le64_to_cpu(features_buf.incompat);
2977         if (incompat & ~RBD_FEATURES_SUPPORTED)
2978                 return -ENXIO;
2979
2980         *snap_features = le64_to_cpu(features_buf.features);
2981
2982         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2983                 (unsigned long long) snap_id,
2984                 (unsigned long long) *snap_features,
2985                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2986
2987         return 0;
2988 }
2989
2990 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2991 {
2992         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2993                                                 &rbd_dev->header.features);
2994 }
2995
2996 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2997 {
2998         struct rbd_spec *parent_spec;
2999         size_t size;
3000         void *reply_buf = NULL;
3001         __le64 snapid;
3002         void *p;
3003         void *end;
3004         char *image_id;
3005         u64 overlap;
3006         int ret;
3007
3008         parent_spec = rbd_spec_alloc();
3009         if (!parent_spec)
3010                 return -ENOMEM;
3011
3012         size = sizeof (__le64) +                                /* pool_id */
3013                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3014                 sizeof (__le64) +                               /* snap_id */
3015                 sizeof (__le64);                                /* overlap */
3016         reply_buf = kmalloc(size, GFP_KERNEL);
3017         if (!reply_buf) {
3018                 ret = -ENOMEM;
3019                 goto out_err;
3020         }
3021
3022         snapid = cpu_to_le64(CEPH_NOSNAP);
3023         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3024                                 "rbd", "get_parent",
3025                                 (char *) &snapid, sizeof (snapid),
3026                                 (char *) reply_buf, size, NULL);
3027         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3028         if (ret < 0)
3029                 goto out_err;
3030
3031         ret = -ERANGE;
3032         p = reply_buf;
3033         end = (char *) reply_buf + size;
3034         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3035         if (parent_spec->pool_id == CEPH_NOPOOL)
3036                 goto out;       /* No parent?  No problem. */
3037
3038         /* The ceph file layout needs to fit pool id in 32 bits */
3039
3040         ret = -EIO;
3041         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
3042                 goto out;
3043
3044         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3045         if (IS_ERR(image_id)) {
3046                 ret = PTR_ERR(image_id);
3047                 goto out_err;
3048         }
3049         parent_spec->image_id = image_id;
3050         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3051         ceph_decode_64_safe(&p, end, overlap, out_err);
3052
3053         rbd_dev->parent_overlap = overlap;
3054         rbd_dev->parent_spec = parent_spec;
3055         parent_spec = NULL;     /* rbd_dev now owns this */
3056 out:
3057         ret = 0;
3058 out_err:
3059         kfree(reply_buf);
3060         rbd_spec_put(parent_spec);
3061
3062         return ret;
3063 }
3064
3065 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3066 {
3067         size_t image_id_size;
3068         char *image_id;
3069         void *p;
3070         void *end;
3071         size_t size;
3072         void *reply_buf = NULL;
3073         size_t len = 0;
3074         char *image_name = NULL;
3075         int ret;
3076
3077         rbd_assert(!rbd_dev->spec->image_name);
3078
3079         len = strlen(rbd_dev->spec->image_id);
3080         image_id_size = sizeof (__le32) + len;
3081         image_id = kmalloc(image_id_size, GFP_KERNEL);
3082         if (!image_id)
3083                 return NULL;
3084
3085         p = image_id;
3086         end = (char *) image_id + image_id_size;
3087         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
3088
3089         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3090         reply_buf = kmalloc(size, GFP_KERNEL);
3091         if (!reply_buf)
3092                 goto out;
3093
3094         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3095                                 "rbd", "dir_get_name",
3096                                 image_id, image_id_size,
3097                                 (char *) reply_buf, size, NULL);
3098         if (ret < 0)
3099                 goto out;
3100         p = reply_buf;
3101         end = (char *) reply_buf + size;
3102         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3103         if (IS_ERR(image_name))
3104                 image_name = NULL;
3105         else
3106                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3107 out:
3108         kfree(reply_buf);
3109         kfree(image_id);
3110
3111         return image_name;
3112 }
3113
3114 /*
3115  * When a parent image gets probed, we only have the pool, image,
3116  * and snapshot ids but not the names of any of them.  This call
3117  * is made later to fill in those names.  It has to be done after
3118  * rbd_dev_snaps_update() has completed because some of the
3119  * information (in particular, snapshot name) is not available
3120  * until then.
3121  */
3122 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3123 {
3124         struct ceph_osd_client *osdc;
3125         const char *name;
3126         void *reply_buf = NULL;
3127         int ret;
3128
3129         if (rbd_dev->spec->pool_name)
3130                 return 0;       /* Already have the names */
3131
3132         /* Look up the pool name */
3133
3134         osdc = &rbd_dev->rbd_client->client->osdc;
3135         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3136         if (!name) {
3137                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3138                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3139                 return -EIO;
3140         }
3141
3142         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3143         if (!rbd_dev->spec->pool_name)
3144                 return -ENOMEM;
3145
3146         /* Fetch the image name; tolerate failure here */
3147
3148         name = rbd_dev_image_name(rbd_dev);
3149         if (name)
3150                 rbd_dev->spec->image_name = (char *) name;
3151         else
3152                 rbd_warn(rbd_dev, "unable to get image name");
3153
3154         /* Look up the snapshot name. */
3155
3156         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3157         if (!name) {
3158                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3159                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3160                 ret = -EIO;
3161                 goto out_err;
3162         }
3163         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3164         if(!rbd_dev->spec->snap_name)
3165                 goto out_err;
3166
3167         return 0;
3168 out_err:
3169         kfree(reply_buf);
3170         kfree(rbd_dev->spec->pool_name);
3171         rbd_dev->spec->pool_name = NULL;
3172
3173         return ret;
3174 }
3175
3176 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3177 {
3178         size_t size;
3179         int ret;
3180         void *reply_buf;
3181         void *p;
3182         void *end;
3183         u64 seq;
3184         u32 snap_count;
3185         struct ceph_snap_context *snapc;
3186         u32 i;
3187
3188         /*
3189          * We'll need room for the seq value (maximum snapshot id),
3190          * snapshot count, and array of that many snapshot ids.
3191          * For now we have a fixed upper limit on the number we're
3192          * prepared to receive.
3193          */
3194         size = sizeof (__le64) + sizeof (__le32) +
3195                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3196         reply_buf = kzalloc(size, GFP_KERNEL);
3197         if (!reply_buf)
3198                 return -ENOMEM;
3199
3200         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3201                                 "rbd", "get_snapcontext",
3202                                 NULL, 0,
3203                                 reply_buf, size, ver);
3204         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3205         if (ret < 0)
3206                 goto out;
3207
3208         ret = -ERANGE;
3209         p = reply_buf;
3210         end = (char *) reply_buf + size;
3211         ceph_decode_64_safe(&p, end, seq, out);
3212         ceph_decode_32_safe(&p, end, snap_count, out);
3213
3214         /*
3215          * Make sure the reported number of snapshot ids wouldn't go
3216          * beyond the end of our buffer.  But before checking that,
3217          * make sure the computed size of the snapshot context we
3218          * allocate is representable in a size_t.
3219          */
3220         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3221                                  / sizeof (u64)) {
3222                 ret = -EINVAL;
3223                 goto out;
3224         }
3225         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3226                 goto out;
3227
3228         size = sizeof (struct ceph_snap_context) +
3229                                 snap_count * sizeof (snapc->snaps[0]);
3230         snapc = kmalloc(size, GFP_KERNEL);
3231         if (!snapc) {
3232                 ret = -ENOMEM;
3233                 goto out;
3234         }
3235
3236         atomic_set(&snapc->nref, 1);
3237         snapc->seq = seq;
3238         snapc->num_snaps = snap_count;
3239         for (i = 0; i < snap_count; i++)
3240                 snapc->snaps[i] = ceph_decode_64(&p);
3241
3242         rbd_dev->header.snapc = snapc;
3243
3244         dout("  snap context seq = %llu, snap_count = %u\n",
3245                 (unsigned long long) seq, (unsigned int) snap_count);
3246
3247 out:
3248         kfree(reply_buf);
3249
3250         return 0;
3251 }
3252
3253 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3254 {
3255         size_t size;
3256         void *reply_buf;
3257         __le64 snap_id;
3258         int ret;
3259         void *p;
3260         void *end;
3261         char *snap_name;
3262
3263         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3264         reply_buf = kmalloc(size, GFP_KERNEL);
3265         if (!reply_buf)
3266                 return ERR_PTR(-ENOMEM);
3267
3268         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3269         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3270                                 "rbd", "get_snapshot_name",
3271                                 (char *) &snap_id, sizeof (snap_id),
3272                                 reply_buf, size, NULL);
3273         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3274         if (ret < 0)
3275                 goto out;
3276
3277         p = reply_buf;
3278         end = (char *) reply_buf + size;
3279         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3280         if (IS_ERR(snap_name)) {
3281                 ret = PTR_ERR(snap_name);
3282                 goto out;
3283         } else {
3284                 dout("  snap_id 0x%016llx snap_name = %s\n",
3285                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3286         }
3287         kfree(reply_buf);
3288
3289         return snap_name;
3290 out:
3291         kfree(reply_buf);
3292
3293         return ERR_PTR(ret);
3294 }
3295
3296 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3297                 u64 *snap_size, u64 *snap_features)
3298 {
3299         u64 snap_id;
3300         u8 order;
3301         int ret;
3302
3303         snap_id = rbd_dev->header.snapc->snaps[which];
3304         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3305         if (ret)
3306                 return ERR_PTR(ret);
3307         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3308         if (ret)
3309                 return ERR_PTR(ret);
3310
3311         return rbd_dev_v2_snap_name(rbd_dev, which);
3312 }
3313
3314 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3315                 u64 *snap_size, u64 *snap_features)
3316 {
3317         if (rbd_dev->image_format == 1)
3318                 return rbd_dev_v1_snap_info(rbd_dev, which,
3319                                         snap_size, snap_features);
3320         if (rbd_dev->image_format == 2)
3321                 return rbd_dev_v2_snap_info(rbd_dev, which,
3322                                         snap_size, snap_features);
3323         return ERR_PTR(-EINVAL);
3324 }
3325
3326 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3327 {
3328         int ret;
3329         __u8 obj_order;
3330
3331         down_write(&rbd_dev->header_rwsem);
3332
3333         /* Grab old order first, to see if it changes */
3334
3335         obj_order = rbd_dev->header.obj_order,
3336         ret = rbd_dev_v2_image_size(rbd_dev);
3337         if (ret)
3338                 goto out;
3339         if (rbd_dev->header.obj_order != obj_order) {
3340                 ret = -EIO;
3341                 goto out;
3342         }
3343         rbd_update_mapping_size(rbd_dev);
3344
3345         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3346         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3347         if (ret)
3348                 goto out;
3349         ret = rbd_dev_snaps_update(rbd_dev);
3350         dout("rbd_dev_snaps_update returned %d\n", ret);
3351         if (ret)
3352                 goto out;
3353         ret = rbd_dev_snaps_register(rbd_dev);
3354         dout("rbd_dev_snaps_register returned %d\n", ret);
3355 out:
3356         up_write(&rbd_dev->header_rwsem);
3357
3358         return ret;
3359 }
3360
3361 /*
3362  * Scan the rbd device's current snapshot list and compare it to the
3363  * newly-received snapshot context.  Remove any existing snapshots
3364  * not present in the new snapshot context.  Add a new snapshot for
3365  * any snaphots in the snapshot context not in the current list.
3366  * And verify there are no changes to snapshots we already know
3367  * about.
3368  *
3369  * Assumes the snapshots in the snapshot context are sorted by
3370  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3371  * are also maintained in that order.)
3372  */
3373 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3374 {
3375         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3376         const u32 snap_count = snapc->num_snaps;
3377         struct list_head *head = &rbd_dev->snaps;
3378         struct list_head *links = head->next;
3379         u32 index = 0;
3380
3381         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3382         while (index < snap_count || links != head) {
3383                 u64 snap_id;
3384                 struct rbd_snap *snap;
3385                 char *snap_name;
3386                 u64 snap_size = 0;
3387                 u64 snap_features = 0;
3388
3389                 snap_id = index < snap_count ? snapc->snaps[index]
3390                                              : CEPH_NOSNAP;
3391                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3392                                      : NULL;
3393                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3394
3395                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3396                         struct list_head *next = links->next;
3397
3398                         /*
3399                          * A previously-existing snapshot is not in
3400                          * the new snap context.
3401                          *
3402                          * If the now missing snapshot is the one the
3403                          * image is mapped to, clear its exists flag
3404                          * so we can avoid sending any more requests
3405                          * to it.
3406                          */
3407                         if (rbd_dev->spec->snap_id == snap->id)
3408                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3409                         rbd_remove_snap_dev(snap);
3410                         dout("%ssnap id %llu has been removed\n",
3411                                 rbd_dev->spec->snap_id == snap->id ?
3412                                                         "mapped " : "",
3413                                 (unsigned long long) snap->id);
3414
3415                         /* Done with this list entry; advance */
3416
3417                         links = next;
3418                         continue;
3419                 }
3420
3421                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3422                                         &snap_size, &snap_features);
3423                 if (IS_ERR(snap_name))
3424                         return PTR_ERR(snap_name);
3425
3426                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3427                         (unsigned long long) snap_id);
3428                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3429                         struct rbd_snap *new_snap;
3430
3431                         /* We haven't seen this snapshot before */
3432
3433                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3434                                         snap_id, snap_size, snap_features);
3435                         if (IS_ERR(new_snap)) {
3436                                 int err = PTR_ERR(new_snap);
3437
3438                                 dout("  failed to add dev, error %d\n", err);
3439
3440                                 return err;
3441                         }
3442
3443                         /* New goes before existing, or at end of list */
3444
3445                         dout("  added dev%s\n", snap ? "" : " at end\n");
3446                         if (snap)
3447                                 list_add_tail(&new_snap->node, &snap->node);
3448                         else
3449                                 list_add_tail(&new_snap->node, head);
3450                 } else {
3451                         /* Already have this one */
3452
3453                         dout("  already present\n");
3454
3455                         rbd_assert(snap->size == snap_size);
3456                         rbd_assert(!strcmp(snap->name, snap_name));
3457                         rbd_assert(snap->features == snap_features);
3458
3459                         /* Done with this list entry; advance */
3460
3461                         links = links->next;
3462                 }
3463
3464                 /* Advance to the next entry in the snapshot context */
3465
3466                 index++;
3467         }
3468         dout("%s: done\n", __func__);
3469
3470         return 0;
3471 }
3472
3473 /*
3474  * Scan the list of snapshots and register the devices for any that
3475  * have not already been registered.
3476  */
3477 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3478 {
3479         struct rbd_snap *snap;
3480         int ret = 0;
3481
3482         dout("%s:\n", __func__);
3483         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3484                 return -EIO;
3485
3486         list_for_each_entry(snap, &rbd_dev->snaps, node) {
3487                 if (!rbd_snap_registered(snap)) {
3488                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3489                         if (ret < 0)
3490                                 break;
3491                 }
3492         }
3493         dout("%s: returning %d\n", __func__, ret);
3494
3495         return ret;
3496 }
3497
3498 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3499 {
3500         struct device *dev;
3501         int ret;
3502
3503         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3504
3505         dev = &rbd_dev->dev;
3506         dev->bus = &rbd_bus_type;
3507         dev->type = &rbd_device_type;
3508         dev->parent = &rbd_root_dev;
3509         dev->release = rbd_dev_release;
3510         dev_set_name(dev, "%d", rbd_dev->dev_id);
3511         ret = device_register(dev);
3512
3513         mutex_unlock(&ctl_mutex);
3514
3515         return ret;
3516 }
3517
3518 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3519 {
3520         device_unregister(&rbd_dev->dev);
3521 }
3522
3523 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3524
3525 /*
3526  * Get a unique rbd identifier for the given new rbd_dev, and add
3527  * the rbd_dev to the global list.  The minimum rbd id is 1.
3528  */
3529 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3530 {
3531         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3532
3533         spin_lock(&rbd_dev_list_lock);
3534         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3535         spin_unlock(&rbd_dev_list_lock);
3536         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3537                 (unsigned long long) rbd_dev->dev_id);
3538 }
3539
3540 /*
3541  * Remove an rbd_dev from the global list, and record that its
3542  * identifier is no longer in use.
3543  */
3544 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3545 {
3546         struct list_head *tmp;
3547         int rbd_id = rbd_dev->dev_id;
3548         int max_id;
3549
3550         rbd_assert(rbd_id > 0);
3551
3552         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3553                 (unsigned long long) rbd_dev->dev_id);
3554         spin_lock(&rbd_dev_list_lock);
3555         list_del_init(&rbd_dev->node);
3556
3557         /*
3558          * If the id being "put" is not the current maximum, there
3559          * is nothing special we need to do.
3560          */
3561         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3562                 spin_unlock(&rbd_dev_list_lock);
3563                 return;
3564         }
3565
3566         /*
3567          * We need to update the current maximum id.  Search the
3568          * list to find out what it is.  We're more likely to find
3569          * the maximum at the end, so search the list backward.
3570          */
3571         max_id = 0;
3572         list_for_each_prev(tmp, &rbd_dev_list) {
3573                 struct rbd_device *rbd_dev;
3574
3575                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3576                 if (rbd_dev->dev_id > max_id)
3577                         max_id = rbd_dev->dev_id;
3578         }
3579         spin_unlock(&rbd_dev_list_lock);
3580
3581         /*
3582          * The max id could have been updated by rbd_dev_id_get(), in
3583          * which case it now accurately reflects the new maximum.
3584          * Be careful not to overwrite the maximum value in that
3585          * case.
3586          */
3587         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3588         dout("  max dev id has been reset\n");
3589 }
3590
3591 /*
3592  * Skips over white space at *buf, and updates *buf to point to the
3593  * first found non-space character (if any). Returns the length of
3594  * the token (string of non-white space characters) found.  Note
3595  * that *buf must be terminated with '\0'.
3596  */
3597 static inline size_t next_token(const char **buf)
3598 {
3599         /*
3600         * These are the characters that produce nonzero for
3601         * isspace() in the "C" and "POSIX" locales.
3602         */
3603         const char *spaces = " \f\n\r\t\v";
3604
3605         *buf += strspn(*buf, spaces);   /* Find start of token */
3606
3607         return strcspn(*buf, spaces);   /* Return token length */
3608 }
3609
3610 /*
3611  * Finds the next token in *buf, and if the provided token buffer is
3612  * big enough, copies the found token into it.  The result, if
3613  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3614  * must be terminated with '\0' on entry.
3615  *
3616  * Returns the length of the token found (not including the '\0').
3617  * Return value will be 0 if no token is found, and it will be >=
3618  * token_size if the token would not fit.
3619  *
3620  * The *buf pointer will be updated to point beyond the end of the
3621  * found token.  Note that this occurs even if the token buffer is
3622  * too small to hold it.
3623  */
3624 static inline size_t copy_token(const char **buf,
3625                                 char *token,
3626                                 size_t token_size)
3627 {
3628         size_t len;
3629
3630         len = next_token(buf);
3631         if (len < token_size) {
3632                 memcpy(token, *buf, len);
3633                 *(token + len) = '\0';
3634         }
3635         *buf += len;
3636
3637         return len;
3638 }
3639
3640 /*
3641  * Finds the next token in *buf, dynamically allocates a buffer big
3642  * enough to hold a copy of it, and copies the token into the new
3643  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3644  * that a duplicate buffer is created even for a zero-length token.
3645  *
3646  * Returns a pointer to the newly-allocated duplicate, or a null
3647  * pointer if memory for the duplicate was not available.  If
3648  * the lenp argument is a non-null pointer, the length of the token
3649  * (not including the '\0') is returned in *lenp.
3650  *
3651  * If successful, the *buf pointer will be updated to point beyond
3652  * the end of the found token.
3653  *
3654  * Note: uses GFP_KERNEL for allocation.
3655  */
3656 static inline char *dup_token(const char **buf, size_t *lenp)
3657 {
3658         char *dup;
3659         size_t len;
3660
3661         len = next_token(buf);
3662         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3663         if (!dup)
3664                 return NULL;
3665         *(dup + len) = '\0';
3666         *buf += len;
3667
3668         if (lenp)
3669                 *lenp = len;
3670
3671         return dup;
3672 }
3673
3674 /*
3675  * Parse the options provided for an "rbd add" (i.e., rbd image
3676  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3677  * and the data written is passed here via a NUL-terminated buffer.
3678  * Returns 0 if successful or an error code otherwise.
3679  *
3680  * The information extracted from these options is recorded in
3681  * the other parameters which return dynamically-allocated
3682  * structures:
3683  *  ceph_opts
3684  *      The address of a pointer that will refer to a ceph options
3685  *      structure.  Caller must release the returned pointer using
3686  *      ceph_destroy_options() when it is no longer needed.
3687  *  rbd_opts
3688  *      Address of an rbd options pointer.  Fully initialized by
3689  *      this function; caller must release with kfree().
3690  *  spec
3691  *      Address of an rbd image specification pointer.  Fully
3692  *      initialized by this function based on parsed options.
3693  *      Caller must release with rbd_spec_put().
3694  *
3695  * The options passed take this form:
3696  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3697  * where:
3698  *  <mon_addrs>
3699  *      A comma-separated list of one or more monitor addresses.
3700  *      A monitor address is an ip address, optionally followed
3701  *      by a port number (separated by a colon).
3702  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3703  *  <options>
3704  *      A comma-separated list of ceph and/or rbd options.
3705  *  <pool_name>
3706  *      The name of the rados pool containing the rbd image.
3707  *  <image_name>
3708  *      The name of the image in that pool to map.
3709  *  <snap_id>
3710  *      An optional snapshot id.  If provided, the mapping will
3711  *      present data from the image at the time that snapshot was
3712  *      created.  The image head is used if no snapshot id is
3713  *      provided.  Snapshot mappings are always read-only.
3714  */
3715 static int rbd_add_parse_args(const char *buf,
3716                                 struct ceph_options **ceph_opts,
3717                                 struct rbd_options **opts,
3718                                 struct rbd_spec **rbd_spec)
3719 {
3720         size_t len;
3721         char *options;
3722         const char *mon_addrs;
3723         size_t mon_addrs_size;
3724         struct rbd_spec *spec = NULL;
3725         struct rbd_options *rbd_opts = NULL;
3726         struct ceph_options *copts;
3727         int ret;
3728
3729         /* The first four tokens are required */
3730
3731         len = next_token(&buf);
3732         if (!len) {
3733                 rbd_warn(NULL, "no monitor address(es) provided");
3734                 return -EINVAL;
3735         }
3736         mon_addrs = buf;
3737         mon_addrs_size = len + 1;
3738         buf += len;
3739
3740         ret = -EINVAL;
3741         options = dup_token(&buf, NULL);
3742         if (!options)
3743                 return -ENOMEM;
3744         if (!*options) {
3745                 rbd_warn(NULL, "no options provided");
3746                 goto out_err;
3747         }
3748
3749         spec = rbd_spec_alloc();
3750         if (!spec)
3751                 goto out_mem;
3752
3753         spec->pool_name = dup_token(&buf, NULL);
3754         if (!spec->pool_name)
3755                 goto out_mem;
3756         if (!*spec->pool_name) {
3757                 rbd_warn(NULL, "no pool name provided");
3758                 goto out_err;
3759         }
3760
3761         spec->image_name = dup_token(&buf, NULL);
3762         if (!spec->image_name)
3763                 goto out_mem;
3764         if (!*spec->image_name) {
3765                 rbd_warn(NULL, "no image name provided");
3766                 goto out_err;
3767         }
3768
3769         /*
3770          * Snapshot name is optional; default is to use "-"
3771          * (indicating the head/no snapshot).
3772          */
3773         len = next_token(&buf);
3774         if (!len) {
3775                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3776                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3777         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3778                 ret = -ENAMETOOLONG;
3779                 goto out_err;
3780         }
3781         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3782         if (!spec->snap_name)
3783                 goto out_mem;
3784         *(spec->snap_name + len) = '\0';
3785
3786         /* Initialize all rbd options to the defaults */
3787
3788         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3789         if (!rbd_opts)
3790                 goto out_mem;
3791
3792         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3793
3794         copts = ceph_parse_options(options, mon_addrs,
3795                                         mon_addrs + mon_addrs_size - 1,
3796                                         parse_rbd_opts_token, rbd_opts);
3797         if (IS_ERR(copts)) {
3798                 ret = PTR_ERR(copts);
3799                 goto out_err;
3800         }
3801         kfree(options);
3802
3803         *ceph_opts = copts;
3804         *opts = rbd_opts;
3805         *rbd_spec = spec;
3806
3807         return 0;
3808 out_mem:
3809         ret = -ENOMEM;
3810 out_err:
3811         kfree(rbd_opts);
3812         rbd_spec_put(spec);
3813         kfree(options);
3814
3815         return ret;
3816 }
3817
3818 /*
3819  * An rbd format 2 image has a unique identifier, distinct from the
3820  * name given to it by the user.  Internally, that identifier is
3821  * what's used to specify the names of objects related to the image.
3822  *
3823  * A special "rbd id" object is used to map an rbd image name to its
3824  * id.  If that object doesn't exist, then there is no v2 rbd image
3825  * with the supplied name.
3826  *
3827  * This function will record the given rbd_dev's image_id field if
3828  * it can be determined, and in that case will return 0.  If any
3829  * errors occur a negative errno will be returned and the rbd_dev's
3830  * image_id field will be unchanged (and should be NULL).
3831  */
3832 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3833 {
3834         int ret;
3835         size_t size;
3836         char *object_name;
3837         void *response;
3838         void *p;
3839
3840         /*
3841          * When probing a parent image, the image id is already
3842          * known (and the image name likely is not).  There's no
3843          * need to fetch the image id again in this case.
3844          */
3845         if (rbd_dev->spec->image_id)
3846                 return 0;
3847
3848         /*
3849          * First, see if the format 2 image id file exists, and if
3850          * so, get the image's persistent id from it.
3851          */
3852         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3853         object_name = kmalloc(size, GFP_NOIO);
3854         if (!object_name)
3855                 return -ENOMEM;
3856         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3857         dout("rbd id object name is %s\n", object_name);
3858
3859         /* Response will be an encoded string, which includes a length */
3860
3861         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3862         response = kzalloc(size, GFP_NOIO);
3863         if (!response) {
3864                 ret = -ENOMEM;
3865                 goto out;
3866         }
3867
3868         ret = rbd_obj_method_sync(rbd_dev, object_name,
3869                                 "rbd", "get_id",
3870                                 NULL, 0,
3871                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3872         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3873         if (ret < 0)
3874                 goto out;
3875
3876         p = response;
3877         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3878                                                 p + RBD_IMAGE_ID_LEN_MAX,
3879                                                 NULL, GFP_NOIO);
3880         if (IS_ERR(rbd_dev->spec->image_id)) {
3881                 ret = PTR_ERR(rbd_dev->spec->image_id);
3882                 rbd_dev->spec->image_id = NULL;
3883         } else {
3884                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3885         }
3886 out:
3887         kfree(response);
3888         kfree(object_name);
3889
3890         return ret;
3891 }
3892
3893 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3894 {
3895         int ret;
3896         size_t size;
3897
3898         /* Version 1 images have no id; empty string is used */
3899
3900         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3901         if (!rbd_dev->spec->image_id)
3902                 return -ENOMEM;
3903
3904         /* Record the header object name for this rbd image. */
3905
3906         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3907         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3908         if (!rbd_dev->header_name) {
3909                 ret = -ENOMEM;
3910                 goto out_err;
3911         }
3912         sprintf(rbd_dev->header_name, "%s%s",
3913                 rbd_dev->spec->image_name, RBD_SUFFIX);
3914
3915         /* Populate rbd image metadata */
3916
3917         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3918         if (ret < 0)
3919                 goto out_err;
3920
3921         /* Version 1 images have no parent (no layering) */
3922
3923         rbd_dev->parent_spec = NULL;
3924         rbd_dev->parent_overlap = 0;
3925
3926         rbd_dev->image_format = 1;
3927
3928         dout("discovered version 1 image, header name is %s\n",
3929                 rbd_dev->header_name);
3930
3931         return 0;
3932
3933 out_err:
3934         kfree(rbd_dev->header_name);
3935         rbd_dev->header_name = NULL;
3936         kfree(rbd_dev->spec->image_id);
3937         rbd_dev->spec->image_id = NULL;
3938
3939         return ret;
3940 }
3941
3942 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3943 {
3944         size_t size;
3945         int ret;
3946         u64 ver = 0;
3947
3948         /*
3949          * Image id was filled in by the caller.  Record the header
3950          * object name for this rbd image.
3951          */
3952         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3953         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3954         if (!rbd_dev->header_name)
3955                 return -ENOMEM;
3956         sprintf(rbd_dev->header_name, "%s%s",
3957                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3958
3959         /* Get the size and object order for the image */
3960
3961         ret = rbd_dev_v2_image_size(rbd_dev);
3962         if (ret < 0)
3963                 goto out_err;
3964
3965         /* Get the object prefix (a.k.a. block_name) for the image */
3966
3967         ret = rbd_dev_v2_object_prefix(rbd_dev);
3968         if (ret < 0)
3969                 goto out_err;
3970
3971         /* Get the and check features for the image */
3972
3973         ret = rbd_dev_v2_features(rbd_dev);
3974         if (ret < 0)
3975                 goto out_err;
3976
3977         /* If the image supports layering, get the parent info */
3978
3979         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3980                 ret = rbd_dev_v2_parent_info(rbd_dev);
3981                 if (ret < 0)
3982                         goto out_err;
3983         }
3984
3985         /* crypto and compression type aren't (yet) supported for v2 images */
3986
3987         rbd_dev->header.crypt_type = 0;
3988         rbd_dev->header.comp_type = 0;
3989
3990         /* Get the snapshot context, plus the header version */
3991
3992         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3993         if (ret)
3994                 goto out_err;
3995         rbd_dev->header.obj_version = ver;
3996
3997         rbd_dev->image_format = 2;
3998
3999         dout("discovered version 2 image, header name is %s\n",
4000                 rbd_dev->header_name);
4001
4002         return 0;
4003 out_err:
4004         rbd_dev->parent_overlap = 0;
4005         rbd_spec_put(rbd_dev->parent_spec);
4006         rbd_dev->parent_spec = NULL;
4007         kfree(rbd_dev->header_name);
4008         rbd_dev->header_name = NULL;
4009         kfree(rbd_dev->header.object_prefix);
4010         rbd_dev->header.object_prefix = NULL;
4011
4012         return ret;
4013 }
4014
4015 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4016 {
4017         int ret;
4018
4019         /* no need to lock here, as rbd_dev is not registered yet */
4020         ret = rbd_dev_snaps_update(rbd_dev);
4021         if (ret)
4022                 return ret;
4023
4024         ret = rbd_dev_probe_update_spec(rbd_dev);
4025         if (ret)
4026                 goto err_out_snaps;
4027
4028         ret = rbd_dev_set_mapping(rbd_dev);
4029         if (ret)
4030                 goto err_out_snaps;
4031
4032         /* generate unique id: find highest unique id, add one */
4033         rbd_dev_id_get(rbd_dev);
4034
4035         /* Fill in the device name, now that we have its id. */
4036         BUILD_BUG_ON(DEV_NAME_LEN
4037                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4038         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4039
4040         /* Get our block major device number. */
4041
4042         ret = register_blkdev(0, rbd_dev->name);
4043         if (ret < 0)
4044                 goto err_out_id;
4045         rbd_dev->major = ret;
4046
4047         /* Set up the blkdev mapping. */
4048
4049         ret = rbd_init_disk(rbd_dev);
4050         if (ret)
4051                 goto err_out_blkdev;
4052
4053         ret = rbd_bus_add_dev(rbd_dev);
4054         if (ret)
4055                 goto err_out_disk;
4056
4057         /*
4058          * At this point cleanup in the event of an error is the job
4059          * of the sysfs code (initiated by rbd_bus_del_dev()).
4060          */
4061         down_write(&rbd_dev->header_rwsem);
4062         ret = rbd_dev_snaps_register(rbd_dev);
4063         up_write(&rbd_dev->header_rwsem);
4064         if (ret)
4065                 goto err_out_bus;
4066
4067         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4068         if (ret)
4069                 goto err_out_bus;
4070
4071         /* Everything's ready.  Announce the disk to the world. */
4072
4073         add_disk(rbd_dev->disk);
4074
4075         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4076                 (unsigned long long) rbd_dev->mapping.size);
4077
4078         return ret;
4079 err_out_bus:
4080         /* this will also clean up rest of rbd_dev stuff */
4081
4082         rbd_bus_del_dev(rbd_dev);
4083
4084         return ret;
4085 err_out_disk:
4086         rbd_free_disk(rbd_dev);
4087 err_out_blkdev:
4088         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4089 err_out_id:
4090         rbd_dev_id_put(rbd_dev);
4091 err_out_snaps:
4092         rbd_remove_all_snaps(rbd_dev);
4093
4094         return ret;
4095 }
4096
4097 /*
4098  * Probe for the existence of the header object for the given rbd
4099  * device.  For format 2 images this includes determining the image
4100  * id.
4101  */
4102 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4103 {
4104         int ret;
4105
4106         /*
4107          * Get the id from the image id object.  If it's not a
4108          * format 2 image, we'll get ENOENT back, and we'll assume
4109          * it's a format 1 image.
4110          */
4111         ret = rbd_dev_image_id(rbd_dev);
4112         if (ret)
4113                 ret = rbd_dev_v1_probe(rbd_dev);
4114         else
4115                 ret = rbd_dev_v2_probe(rbd_dev);
4116         if (ret) {
4117                 dout("probe failed, returning %d\n", ret);
4118
4119                 return ret;
4120         }
4121
4122         ret = rbd_dev_probe_finish(rbd_dev);
4123         if (ret)
4124                 rbd_header_free(&rbd_dev->header);
4125
4126         return ret;
4127 }
4128
4129 static ssize_t rbd_add(struct bus_type *bus,
4130                        const char *buf,
4131                        size_t count)
4132 {
4133         struct rbd_device *rbd_dev = NULL;
4134         struct ceph_options *ceph_opts = NULL;
4135         struct rbd_options *rbd_opts = NULL;
4136         struct rbd_spec *spec = NULL;
4137         struct rbd_client *rbdc;
4138         struct ceph_osd_client *osdc;
4139         int rc = -ENOMEM;
4140
4141         if (!try_module_get(THIS_MODULE))
4142                 return -ENODEV;
4143
4144         /* parse add command */
4145         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4146         if (rc < 0)
4147                 goto err_out_module;
4148
4149         rbdc = rbd_get_client(ceph_opts);
4150         if (IS_ERR(rbdc)) {
4151                 rc = PTR_ERR(rbdc);
4152                 goto err_out_args;
4153         }
4154         ceph_opts = NULL;       /* rbd_dev client now owns this */
4155
4156         /* pick the pool */
4157         osdc = &rbdc->client->osdc;
4158         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4159         if (rc < 0)
4160                 goto err_out_client;
4161         spec->pool_id = (u64) rc;
4162
4163         /* The ceph file layout needs to fit pool id in 32 bits */
4164
4165         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4166                 rc = -EIO;
4167                 goto err_out_client;
4168         }
4169
4170         rbd_dev = rbd_dev_create(rbdc, spec);
4171         if (!rbd_dev)
4172                 goto err_out_client;
4173         rbdc = NULL;            /* rbd_dev now owns this */
4174         spec = NULL;            /* rbd_dev now owns this */
4175
4176         rbd_dev->mapping.read_only = rbd_opts->read_only;
4177         kfree(rbd_opts);
4178         rbd_opts = NULL;        /* done with this */
4179
4180         rc = rbd_dev_probe(rbd_dev);
4181         if (rc < 0)
4182                 goto err_out_rbd_dev;
4183
4184         return count;
4185 err_out_rbd_dev:
4186         rbd_dev_destroy(rbd_dev);
4187 err_out_client:
4188         rbd_put_client(rbdc);
4189 err_out_args:
4190         if (ceph_opts)
4191                 ceph_destroy_options(ceph_opts);
4192         kfree(rbd_opts);
4193         rbd_spec_put(spec);
4194 err_out_module:
4195         module_put(THIS_MODULE);
4196
4197         dout("Error adding device %s\n", buf);
4198
4199         return (ssize_t) rc;
4200 }
4201
4202 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4203 {
4204         struct list_head *tmp;
4205         struct rbd_device *rbd_dev;
4206
4207         spin_lock(&rbd_dev_list_lock);
4208         list_for_each(tmp, &rbd_dev_list) {
4209                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4210                 if (rbd_dev->dev_id == dev_id) {
4211                         spin_unlock(&rbd_dev_list_lock);
4212                         return rbd_dev;
4213                 }
4214         }
4215         spin_unlock(&rbd_dev_list_lock);
4216         return NULL;
4217 }
4218
4219 static void rbd_dev_release(struct device *dev)
4220 {
4221         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4222
4223         if (rbd_dev->watch_event)
4224                 rbd_dev_header_watch_sync(rbd_dev, 0);
4225
4226         /* clean up and free blkdev */
4227         rbd_free_disk(rbd_dev);
4228         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4229
4230         /* release allocated disk header fields */
4231         rbd_header_free(&rbd_dev->header);
4232
4233         /* done with the id, and with the rbd_dev */
4234         rbd_dev_id_put(rbd_dev);
4235         rbd_assert(rbd_dev->rbd_client != NULL);
4236         rbd_dev_destroy(rbd_dev);
4237
4238         /* release module ref */
4239         module_put(THIS_MODULE);
4240 }
4241
4242 static ssize_t rbd_remove(struct bus_type *bus,
4243                           const char *buf,
4244                           size_t count)
4245 {
4246         struct rbd_device *rbd_dev = NULL;
4247         int target_id, rc;
4248         unsigned long ul;
4249         int ret = count;
4250
4251         rc = strict_strtoul(buf, 10, &ul);
4252         if (rc)
4253                 return rc;
4254
4255         /* convert to int; abort if we lost anything in the conversion */
4256         target_id = (int) ul;
4257         if (target_id != ul)
4258                 return -EINVAL;
4259
4260         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4261
4262         rbd_dev = __rbd_get_dev(target_id);
4263         if (!rbd_dev) {
4264                 ret = -ENOENT;
4265                 goto done;
4266         }
4267
4268         spin_lock_irq(&rbd_dev->lock);
4269         if (rbd_dev->open_count)
4270                 ret = -EBUSY;
4271         else
4272                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4273         spin_unlock_irq(&rbd_dev->lock);
4274         if (ret < 0)
4275                 goto done;
4276
4277         rbd_remove_all_snaps(rbd_dev);
4278         rbd_bus_del_dev(rbd_dev);
4279
4280 done:
4281         mutex_unlock(&ctl_mutex);
4282
4283         return ret;
4284 }
4285
4286 /*
4287  * create control files in sysfs
4288  * /sys/bus/rbd/...
4289  */
4290 static int rbd_sysfs_init(void)
4291 {
4292         int ret;
4293
4294         ret = device_register(&rbd_root_dev);
4295         if (ret < 0)
4296                 return ret;
4297
4298         ret = bus_register(&rbd_bus_type);
4299         if (ret < 0)
4300                 device_unregister(&rbd_root_dev);
4301
4302         return ret;
4303 }
4304
4305 static void rbd_sysfs_cleanup(void)
4306 {
4307         bus_unregister(&rbd_bus_type);
4308         device_unregister(&rbd_root_dev);
4309 }
4310
4311 static int __init rbd_init(void)
4312 {
4313         int rc;
4314
4315         if (!libceph_compatible(NULL)) {
4316                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4317
4318                 return -EINVAL;
4319         }
4320         rc = rbd_sysfs_init();
4321         if (rc)
4322                 return rc;
4323         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4324         return 0;
4325 }
4326
4327 static void __exit rbd_exit(void)
4328 {
4329         rbd_sysfs_cleanup();
4330 }
4331
4332 module_init(rbd_init);
4333 module_exit(rbd_exit);
4334
4335 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4336 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4337 MODULE_DESCRIPTION("rados block device");
4338
4339 /* following authorship retained from original osdblk.c */
4340 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4341
4342 MODULE_LICENSE("GPL");