rbd: get and check striping parameters
[firefly-linux-kernel-4.4.55.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (0)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 obj_version;
112 };
113
114 /*
115  * An rbd image specification.
116  *
117  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118  * identify an image.  Each rbd_dev structure includes a pointer to
119  * an rbd_spec structure that encapsulates this identity.
120  *
121  * Each of the id's in an rbd_spec has an associated name.  For a
122  * user-mapped image, the names are supplied and the id's associated
123  * with them are looked up.  For a layered image, a parent image is
124  * defined by the tuple, and the names are looked up.
125  *
126  * An rbd_dev structure contains a parent_spec pointer which is
127  * non-null if the image it represents is a child in a layered
128  * image.  This pointer will refer to the rbd_spec structure used
129  * by the parent rbd_dev for its own identity (i.e., the structure
130  * is shared between the parent and child).
131  *
132  * Since these structures are populated once, during the discovery
133  * phase of image construction, they are effectively immutable so
134  * we make no effort to synchronize access to them.
135  *
136  * Note that code herein does not assume the image name is known (it
137  * could be a null pointer).
138  */
139 struct rbd_spec {
140         u64             pool_id;
141         char            *pool_name;
142
143         char            *image_id;
144         char            *image_name;
145
146         u64             snap_id;
147         char            *snap_name;
148
149         struct kref     kref;
150 };
151
152 /*
153  * an instance of the client.  multiple devices may share an rbd client.
154  */
155 struct rbd_client {
156         struct ceph_client      *client;
157         struct kref             kref;
158         struct list_head        node;
159 };
160
161 struct rbd_img_request;
162 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
165
166 struct rbd_obj_request;
167 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
169 enum obj_request_type {
170         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171 };
172
173 enum obj_req_flags {
174         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
175         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
176         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
177         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
178 };
179
180 struct rbd_obj_request {
181         const char              *object_name;
182         u64                     offset;         /* object start byte */
183         u64                     length;         /* bytes from offset */
184         unsigned long           flags;
185
186         /*
187          * An object request associated with an image will have its
188          * img_data flag set; a standalone object request will not.
189          *
190          * A standalone object request will have which == BAD_WHICH
191          * and a null obj_request pointer.
192          *
193          * An object request initiated in support of a layered image
194          * object (to check for its existence before a write) will
195          * have which == BAD_WHICH and a non-null obj_request pointer.
196          *
197          * Finally, an object request for rbd image data will have
198          * which != BAD_WHICH, and will have a non-null img_request
199          * pointer.  The value of which will be in the range
200          * 0..(img_request->obj_request_count-1).
201          */
202         union {
203                 struct rbd_obj_request  *obj_request;   /* STAT op */
204                 struct {
205                         struct rbd_img_request  *img_request;
206                         u64                     img_offset;
207                         /* links for img_request->obj_requests list */
208                         struct list_head        links;
209                 };
210         };
211         u32                     which;          /* posn image request list */
212
213         enum obj_request_type   type;
214         union {
215                 struct bio      *bio_list;
216                 struct {
217                         struct page     **pages;
218                         u32             page_count;
219                 };
220         };
221         struct page             **copyup_pages;
222
223         struct ceph_osd_request *osd_req;
224
225         u64                     xferred;        /* bytes transferred */
226         u64                     version;
227         int                     result;
228
229         rbd_obj_callback_t      callback;
230         struct completion       completion;
231
232         struct kref             kref;
233 };
234
235 enum img_req_flags {
236         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
237         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
238         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
239 };
240
241 struct rbd_img_request {
242         struct rbd_device       *rbd_dev;
243         u64                     offset; /* starting image byte offset */
244         u64                     length; /* byte count from offset */
245         unsigned long           flags;
246         union {
247                 u64                     snap_id;        /* for reads */
248                 struct ceph_snap_context *snapc;        /* for writes */
249         };
250         union {
251                 struct request          *rq;            /* block request */
252                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
253         };
254         struct page             **copyup_pages;
255         spinlock_t              completion_lock;/* protects next_completion */
256         u32                     next_completion;
257         rbd_img_callback_t      callback;
258         u64                     xferred;/* aggregate bytes transferred */
259         int                     result; /* first nonzero obj_request result */
260
261         u32                     obj_request_count;
262         struct list_head        obj_requests;   /* rbd_obj_request structs */
263
264         struct kref             kref;
265 };
266
267 #define for_each_obj_request(ireq, oreq) \
268         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
269 #define for_each_obj_request_from(ireq, oreq) \
270         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
271 #define for_each_obj_request_safe(ireq, oreq, n) \
272         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
273
274 struct rbd_snap {
275         struct  device          dev;
276         const char              *name;
277         u64                     size;
278         struct list_head        node;
279         u64                     id;
280         u64                     features;
281 };
282
283 struct rbd_mapping {
284         u64                     size;
285         u64                     features;
286         bool                    read_only;
287 };
288
289 /*
290  * a single device
291  */
292 struct rbd_device {
293         int                     dev_id;         /* blkdev unique id */
294
295         int                     major;          /* blkdev assigned major */
296         struct gendisk          *disk;          /* blkdev's gendisk and rq */
297
298         u32                     image_format;   /* Either 1 or 2 */
299         struct rbd_client       *rbd_client;
300
301         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
302
303         spinlock_t              lock;           /* queue, flags, open_count */
304
305         struct rbd_image_header header;
306         unsigned long           flags;          /* possibly lock protected */
307         struct rbd_spec         *spec;
308
309         char                    *header_name;
310
311         struct ceph_file_layout layout;
312
313         struct ceph_osd_event   *watch_event;
314         struct rbd_obj_request  *watch_request;
315
316         struct rbd_spec         *parent_spec;
317         u64                     parent_overlap;
318         struct rbd_device       *parent;
319
320         u64                     stripe_unit;
321         u64                     stripe_count;
322
323         /* protects updating the header */
324         struct rw_semaphore     header_rwsem;
325
326         struct rbd_mapping      mapping;
327
328         struct list_head        node;
329
330         /* list of snapshots */
331         struct list_head        snaps;
332
333         /* sysfs related */
334         struct device           dev;
335         unsigned long           open_count;     /* protected by lock */
336 };
337
338 /*
339  * Flag bits for rbd_dev->flags.  If atomicity is required,
340  * rbd_dev->lock is used to protect access.
341  *
342  * Currently, only the "removing" flag (which is coupled with the
343  * "open_count" field) requires atomic access.
344  */
345 enum rbd_dev_flags {
346         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
347         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
348 };
349
350 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
351
352 static LIST_HEAD(rbd_dev_list);    /* devices */
353 static DEFINE_SPINLOCK(rbd_dev_list_lock);
354
355 static LIST_HEAD(rbd_client_list);              /* clients */
356 static DEFINE_SPINLOCK(rbd_client_list_lock);
357
358 static int rbd_img_request_submit(struct rbd_img_request *img_request);
359
360 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
361 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
362
363 static void rbd_dev_release(struct device *dev);
364 static void rbd_remove_snap_dev(struct rbd_snap *snap);
365
366 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
367                        size_t count);
368 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
369                           size_t count);
370 static int rbd_dev_probe(struct rbd_device *rbd_dev);
371
372 static struct bus_attribute rbd_bus_attrs[] = {
373         __ATTR(add, S_IWUSR, NULL, rbd_add),
374         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
375         __ATTR_NULL
376 };
377
378 static struct bus_type rbd_bus_type = {
379         .name           = "rbd",
380         .bus_attrs      = rbd_bus_attrs,
381 };
382
383 static void rbd_root_dev_release(struct device *dev)
384 {
385 }
386
387 static struct device rbd_root_dev = {
388         .init_name =    "rbd",
389         .release =      rbd_root_dev_release,
390 };
391
392 static __printf(2, 3)
393 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
394 {
395         struct va_format vaf;
396         va_list args;
397
398         va_start(args, fmt);
399         vaf.fmt = fmt;
400         vaf.va = &args;
401
402         if (!rbd_dev)
403                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
404         else if (rbd_dev->disk)
405                 printk(KERN_WARNING "%s: %s: %pV\n",
406                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
407         else if (rbd_dev->spec && rbd_dev->spec->image_name)
408                 printk(KERN_WARNING "%s: image %s: %pV\n",
409                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
410         else if (rbd_dev->spec && rbd_dev->spec->image_id)
411                 printk(KERN_WARNING "%s: id %s: %pV\n",
412                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
413         else    /* punt */
414                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
415                         RBD_DRV_NAME, rbd_dev, &vaf);
416         va_end(args);
417 }
418
419 #ifdef RBD_DEBUG
420 #define rbd_assert(expr)                                                \
421                 if (unlikely(!(expr))) {                                \
422                         printk(KERN_ERR "\nAssertion failure in %s() "  \
423                                                 "at line %d:\n\n"       \
424                                         "\trbd_assert(%s);\n\n",        \
425                                         __func__, __LINE__, #expr);     \
426                         BUG();                                          \
427                 }
428 #else /* !RBD_DEBUG */
429 #  define rbd_assert(expr)      ((void) 0)
430 #endif /* !RBD_DEBUG */
431
432 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
433 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
434
435 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
436 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
437
438 static int rbd_open(struct block_device *bdev, fmode_t mode)
439 {
440         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
441         bool removing = false;
442
443         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
444                 return -EROFS;
445
446         spin_lock_irq(&rbd_dev->lock);
447         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
448                 removing = true;
449         else
450                 rbd_dev->open_count++;
451         spin_unlock_irq(&rbd_dev->lock);
452         if (removing)
453                 return -ENOENT;
454
455         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
456         (void) get_device(&rbd_dev->dev);
457         set_device_ro(bdev, rbd_dev->mapping.read_only);
458         mutex_unlock(&ctl_mutex);
459
460         return 0;
461 }
462
463 static int rbd_release(struct gendisk *disk, fmode_t mode)
464 {
465         struct rbd_device *rbd_dev = disk->private_data;
466         unsigned long open_count_before;
467
468         spin_lock_irq(&rbd_dev->lock);
469         open_count_before = rbd_dev->open_count--;
470         spin_unlock_irq(&rbd_dev->lock);
471         rbd_assert(open_count_before > 0);
472
473         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
474         put_device(&rbd_dev->dev);
475         mutex_unlock(&ctl_mutex);
476
477         return 0;
478 }
479
480 static const struct block_device_operations rbd_bd_ops = {
481         .owner                  = THIS_MODULE,
482         .open                   = rbd_open,
483         .release                = rbd_release,
484 };
485
486 /*
487  * Initialize an rbd client instance.
488  * We own *ceph_opts.
489  */
490 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
491 {
492         struct rbd_client *rbdc;
493         int ret = -ENOMEM;
494
495         dout("%s:\n", __func__);
496         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
497         if (!rbdc)
498                 goto out_opt;
499
500         kref_init(&rbdc->kref);
501         INIT_LIST_HEAD(&rbdc->node);
502
503         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
504
505         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
506         if (IS_ERR(rbdc->client))
507                 goto out_mutex;
508         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
509
510         ret = ceph_open_session(rbdc->client);
511         if (ret < 0)
512                 goto out_err;
513
514         spin_lock(&rbd_client_list_lock);
515         list_add_tail(&rbdc->node, &rbd_client_list);
516         spin_unlock(&rbd_client_list_lock);
517
518         mutex_unlock(&ctl_mutex);
519         dout("%s: rbdc %p\n", __func__, rbdc);
520
521         return rbdc;
522
523 out_err:
524         ceph_destroy_client(rbdc->client);
525 out_mutex:
526         mutex_unlock(&ctl_mutex);
527         kfree(rbdc);
528 out_opt:
529         if (ceph_opts)
530                 ceph_destroy_options(ceph_opts);
531         dout("%s: error %d\n", __func__, ret);
532
533         return ERR_PTR(ret);
534 }
535
536 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
537 {
538         kref_get(&rbdc->kref);
539
540         return rbdc;
541 }
542
543 /*
544  * Find a ceph client with specific addr and configuration.  If
545  * found, bump its reference count.
546  */
547 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
548 {
549         struct rbd_client *client_node;
550         bool found = false;
551
552         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
553                 return NULL;
554
555         spin_lock(&rbd_client_list_lock);
556         list_for_each_entry(client_node, &rbd_client_list, node) {
557                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
558                         __rbd_get_client(client_node);
559
560                         found = true;
561                         break;
562                 }
563         }
564         spin_unlock(&rbd_client_list_lock);
565
566         return found ? client_node : NULL;
567 }
568
569 /*
570  * mount options
571  */
572 enum {
573         Opt_last_int,
574         /* int args above */
575         Opt_last_string,
576         /* string args above */
577         Opt_read_only,
578         Opt_read_write,
579         /* Boolean args above */
580         Opt_last_bool,
581 };
582
583 static match_table_t rbd_opts_tokens = {
584         /* int args above */
585         /* string args above */
586         {Opt_read_only, "read_only"},
587         {Opt_read_only, "ro"},          /* Alternate spelling */
588         {Opt_read_write, "read_write"},
589         {Opt_read_write, "rw"},         /* Alternate spelling */
590         /* Boolean args above */
591         {-1, NULL}
592 };
593
594 struct rbd_options {
595         bool    read_only;
596 };
597
598 #define RBD_READ_ONLY_DEFAULT   false
599
600 static int parse_rbd_opts_token(char *c, void *private)
601 {
602         struct rbd_options *rbd_opts = private;
603         substring_t argstr[MAX_OPT_ARGS];
604         int token, intval, ret;
605
606         token = match_token(c, rbd_opts_tokens, argstr);
607         if (token < 0)
608                 return -EINVAL;
609
610         if (token < Opt_last_int) {
611                 ret = match_int(&argstr[0], &intval);
612                 if (ret < 0) {
613                         pr_err("bad mount option arg (not int) "
614                                "at '%s'\n", c);
615                         return ret;
616                 }
617                 dout("got int token %d val %d\n", token, intval);
618         } else if (token > Opt_last_int && token < Opt_last_string) {
619                 dout("got string token %d val %s\n", token,
620                      argstr[0].from);
621         } else if (token > Opt_last_string && token < Opt_last_bool) {
622                 dout("got Boolean token %d\n", token);
623         } else {
624                 dout("got token %d\n", token);
625         }
626
627         switch (token) {
628         case Opt_read_only:
629                 rbd_opts->read_only = true;
630                 break;
631         case Opt_read_write:
632                 rbd_opts->read_only = false;
633                 break;
634         default:
635                 rbd_assert(false);
636                 break;
637         }
638         return 0;
639 }
640
641 /*
642  * Get a ceph client with specific addr and configuration, if one does
643  * not exist create it.
644  */
645 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
646 {
647         struct rbd_client *rbdc;
648
649         rbdc = rbd_client_find(ceph_opts);
650         if (rbdc)       /* using an existing client */
651                 ceph_destroy_options(ceph_opts);
652         else
653                 rbdc = rbd_client_create(ceph_opts);
654
655         return rbdc;
656 }
657
658 /*
659  * Destroy ceph client
660  *
661  * Caller must hold rbd_client_list_lock.
662  */
663 static void rbd_client_release(struct kref *kref)
664 {
665         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
666
667         dout("%s: rbdc %p\n", __func__, rbdc);
668         spin_lock(&rbd_client_list_lock);
669         list_del(&rbdc->node);
670         spin_unlock(&rbd_client_list_lock);
671
672         ceph_destroy_client(rbdc->client);
673         kfree(rbdc);
674 }
675
676 /*
677  * Drop reference to ceph client node. If it's not referenced anymore, release
678  * it.
679  */
680 static void rbd_put_client(struct rbd_client *rbdc)
681 {
682         if (rbdc)
683                 kref_put(&rbdc->kref, rbd_client_release);
684 }
685
686 static bool rbd_image_format_valid(u32 image_format)
687 {
688         return image_format == 1 || image_format == 2;
689 }
690
691 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
692 {
693         size_t size;
694         u32 snap_count;
695
696         /* The header has to start with the magic rbd header text */
697         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
698                 return false;
699
700         /* The bio layer requires at least sector-sized I/O */
701
702         if (ondisk->options.order < SECTOR_SHIFT)
703                 return false;
704
705         /* If we use u64 in a few spots we may be able to loosen this */
706
707         if (ondisk->options.order > 8 * sizeof (int) - 1)
708                 return false;
709
710         /*
711          * The size of a snapshot header has to fit in a size_t, and
712          * that limits the number of snapshots.
713          */
714         snap_count = le32_to_cpu(ondisk->snap_count);
715         size = SIZE_MAX - sizeof (struct ceph_snap_context);
716         if (snap_count > size / sizeof (__le64))
717                 return false;
718
719         /*
720          * Not only that, but the size of the entire the snapshot
721          * header must also be representable in a size_t.
722          */
723         size -= snap_count * sizeof (__le64);
724         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
725                 return false;
726
727         return true;
728 }
729
730 /*
731  * Create a new header structure, translate header format from the on-disk
732  * header.
733  */
734 static int rbd_header_from_disk(struct rbd_image_header *header,
735                                  struct rbd_image_header_ondisk *ondisk)
736 {
737         u32 snap_count;
738         size_t len;
739         size_t size;
740         u32 i;
741
742         memset(header, 0, sizeof (*header));
743
744         snap_count = le32_to_cpu(ondisk->snap_count);
745
746         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
747         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
748         if (!header->object_prefix)
749                 return -ENOMEM;
750         memcpy(header->object_prefix, ondisk->object_prefix, len);
751         header->object_prefix[len] = '\0';
752
753         if (snap_count) {
754                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
755
756                 /* Save a copy of the snapshot names */
757
758                 if (snap_names_len > (u64) SIZE_MAX)
759                         return -EIO;
760                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
761                 if (!header->snap_names)
762                         goto out_err;
763                 /*
764                  * Note that rbd_dev_v1_header_read() guarantees
765                  * the ondisk buffer we're working with has
766                  * snap_names_len bytes beyond the end of the
767                  * snapshot id array, this memcpy() is safe.
768                  */
769                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
770                         snap_names_len);
771
772                 /* Record each snapshot's size */
773
774                 size = snap_count * sizeof (*header->snap_sizes);
775                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
776                 if (!header->snap_sizes)
777                         goto out_err;
778                 for (i = 0; i < snap_count; i++)
779                         header->snap_sizes[i] =
780                                 le64_to_cpu(ondisk->snaps[i].image_size);
781         } else {
782                 WARN_ON(ondisk->snap_names_len);
783                 header->snap_names = NULL;
784                 header->snap_sizes = NULL;
785         }
786
787         header->features = 0;   /* No features support in v1 images */
788         header->obj_order = ondisk->options.order;
789         header->crypt_type = ondisk->options.crypt_type;
790         header->comp_type = ondisk->options.comp_type;
791
792         /* Allocate and fill in the snapshot context */
793
794         header->image_size = le64_to_cpu(ondisk->image_size);
795         size = sizeof (struct ceph_snap_context);
796         size += snap_count * sizeof (header->snapc->snaps[0]);
797         header->snapc = kzalloc(size, GFP_KERNEL);
798         if (!header->snapc)
799                 goto out_err;
800
801         atomic_set(&header->snapc->nref, 1);
802         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
803         header->snapc->num_snaps = snap_count;
804         for (i = 0; i < snap_count; i++)
805                 header->snapc->snaps[i] =
806                         le64_to_cpu(ondisk->snaps[i].id);
807
808         return 0;
809
810 out_err:
811         kfree(header->snap_sizes);
812         header->snap_sizes = NULL;
813         kfree(header->snap_names);
814         header->snap_names = NULL;
815         kfree(header->object_prefix);
816         header->object_prefix = NULL;
817
818         return -ENOMEM;
819 }
820
821 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
822 {
823         struct rbd_snap *snap;
824
825         if (snap_id == CEPH_NOSNAP)
826                 return RBD_SNAP_HEAD_NAME;
827
828         list_for_each_entry(snap, &rbd_dev->snaps, node)
829                 if (snap_id == snap->id)
830                         return snap->name;
831
832         return NULL;
833 }
834
835 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
836 {
837
838         struct rbd_snap *snap;
839
840         list_for_each_entry(snap, &rbd_dev->snaps, node) {
841                 if (!strcmp(snap_name, snap->name)) {
842                         rbd_dev->spec->snap_id = snap->id;
843                         rbd_dev->mapping.size = snap->size;
844                         rbd_dev->mapping.features = snap->features;
845
846                         return 0;
847                 }
848         }
849
850         return -ENOENT;
851 }
852
853 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
854 {
855         int ret;
856
857         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
858                     sizeof (RBD_SNAP_HEAD_NAME))) {
859                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
860                 rbd_dev->mapping.size = rbd_dev->header.image_size;
861                 rbd_dev->mapping.features = rbd_dev->header.features;
862                 ret = 0;
863         } else {
864                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
865                 if (ret < 0)
866                         goto done;
867                 rbd_dev->mapping.read_only = true;
868         }
869         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
870
871 done:
872         return ret;
873 }
874
875 static void rbd_header_free(struct rbd_image_header *header)
876 {
877         kfree(header->object_prefix);
878         header->object_prefix = NULL;
879         kfree(header->snap_sizes);
880         header->snap_sizes = NULL;
881         kfree(header->snap_names);
882         header->snap_names = NULL;
883         ceph_put_snap_context(header->snapc);
884         header->snapc = NULL;
885 }
886
887 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
888 {
889         char *name;
890         u64 segment;
891         int ret;
892
893         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
894         if (!name)
895                 return NULL;
896         segment = offset >> rbd_dev->header.obj_order;
897         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
898                         rbd_dev->header.object_prefix, segment);
899         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
900                 pr_err("error formatting segment name for #%llu (%d)\n",
901                         segment, ret);
902                 kfree(name);
903                 name = NULL;
904         }
905
906         return name;
907 }
908
909 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
910 {
911         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
912
913         return offset & (segment_size - 1);
914 }
915
916 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
917                                 u64 offset, u64 length)
918 {
919         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
920
921         offset &= segment_size - 1;
922
923         rbd_assert(length <= U64_MAX - offset);
924         if (offset + length > segment_size)
925                 length = segment_size - offset;
926
927         return length;
928 }
929
930 /*
931  * returns the size of an object in the image
932  */
933 static u64 rbd_obj_bytes(struct rbd_image_header *header)
934 {
935         return 1 << header->obj_order;
936 }
937
938 /*
939  * bio helpers
940  */
941
942 static void bio_chain_put(struct bio *chain)
943 {
944         struct bio *tmp;
945
946         while (chain) {
947                 tmp = chain;
948                 chain = chain->bi_next;
949                 bio_put(tmp);
950         }
951 }
952
953 /*
954  * zeros a bio chain, starting at specific offset
955  */
956 static void zero_bio_chain(struct bio *chain, int start_ofs)
957 {
958         struct bio_vec *bv;
959         unsigned long flags;
960         void *buf;
961         int i;
962         int pos = 0;
963
964         while (chain) {
965                 bio_for_each_segment(bv, chain, i) {
966                         if (pos + bv->bv_len > start_ofs) {
967                                 int remainder = max(start_ofs - pos, 0);
968                                 buf = bvec_kmap_irq(bv, &flags);
969                                 memset(buf + remainder, 0,
970                                        bv->bv_len - remainder);
971                                 bvec_kunmap_irq(buf, &flags);
972                         }
973                         pos += bv->bv_len;
974                 }
975
976                 chain = chain->bi_next;
977         }
978 }
979
980 /*
981  * similar to zero_bio_chain(), zeros data defined by a page array,
982  * starting at the given byte offset from the start of the array and
983  * continuing up to the given end offset.  The pages array is
984  * assumed to be big enough to hold all bytes up to the end.
985  */
986 static void zero_pages(struct page **pages, u64 offset, u64 end)
987 {
988         struct page **page = &pages[offset >> PAGE_SHIFT];
989
990         rbd_assert(end > offset);
991         rbd_assert(end - offset <= (u64)SIZE_MAX);
992         while (offset < end) {
993                 size_t page_offset;
994                 size_t length;
995                 unsigned long flags;
996                 void *kaddr;
997
998                 page_offset = (size_t)(offset & ~PAGE_MASK);
999                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1000                 local_irq_save(flags);
1001                 kaddr = kmap_atomic(*page);
1002                 memset(kaddr + page_offset, 0, length);
1003                 kunmap_atomic(kaddr);
1004                 local_irq_restore(flags);
1005
1006                 offset += length;
1007                 page++;
1008         }
1009 }
1010
1011 /*
1012  * Clone a portion of a bio, starting at the given byte offset
1013  * and continuing for the number of bytes indicated.
1014  */
1015 static struct bio *bio_clone_range(struct bio *bio_src,
1016                                         unsigned int offset,
1017                                         unsigned int len,
1018                                         gfp_t gfpmask)
1019 {
1020         struct bio_vec *bv;
1021         unsigned int resid;
1022         unsigned short idx;
1023         unsigned int voff;
1024         unsigned short end_idx;
1025         unsigned short vcnt;
1026         struct bio *bio;
1027
1028         /* Handle the easy case for the caller */
1029
1030         if (!offset && len == bio_src->bi_size)
1031                 return bio_clone(bio_src, gfpmask);
1032
1033         if (WARN_ON_ONCE(!len))
1034                 return NULL;
1035         if (WARN_ON_ONCE(len > bio_src->bi_size))
1036                 return NULL;
1037         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1038                 return NULL;
1039
1040         /* Find first affected segment... */
1041
1042         resid = offset;
1043         __bio_for_each_segment(bv, bio_src, idx, 0) {
1044                 if (resid < bv->bv_len)
1045                         break;
1046                 resid -= bv->bv_len;
1047         }
1048         voff = resid;
1049
1050         /* ...and the last affected segment */
1051
1052         resid += len;
1053         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1054                 if (resid <= bv->bv_len)
1055                         break;
1056                 resid -= bv->bv_len;
1057         }
1058         vcnt = end_idx - idx + 1;
1059
1060         /* Build the clone */
1061
1062         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1063         if (!bio)
1064                 return NULL;    /* ENOMEM */
1065
1066         bio->bi_bdev = bio_src->bi_bdev;
1067         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1068         bio->bi_rw = bio_src->bi_rw;
1069         bio->bi_flags |= 1 << BIO_CLONED;
1070
1071         /*
1072          * Copy over our part of the bio_vec, then update the first
1073          * and last (or only) entries.
1074          */
1075         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1076                         vcnt * sizeof (struct bio_vec));
1077         bio->bi_io_vec[0].bv_offset += voff;
1078         if (vcnt > 1) {
1079                 bio->bi_io_vec[0].bv_len -= voff;
1080                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1081         } else {
1082                 bio->bi_io_vec[0].bv_len = len;
1083         }
1084
1085         bio->bi_vcnt = vcnt;
1086         bio->bi_size = len;
1087         bio->bi_idx = 0;
1088
1089         return bio;
1090 }
1091
1092 /*
1093  * Clone a portion of a bio chain, starting at the given byte offset
1094  * into the first bio in the source chain and continuing for the
1095  * number of bytes indicated.  The result is another bio chain of
1096  * exactly the given length, or a null pointer on error.
1097  *
1098  * The bio_src and offset parameters are both in-out.  On entry they
1099  * refer to the first source bio and the offset into that bio where
1100  * the start of data to be cloned is located.
1101  *
1102  * On return, bio_src is updated to refer to the bio in the source
1103  * chain that contains first un-cloned byte, and *offset will
1104  * contain the offset of that byte within that bio.
1105  */
1106 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1107                                         unsigned int *offset,
1108                                         unsigned int len,
1109                                         gfp_t gfpmask)
1110 {
1111         struct bio *bi = *bio_src;
1112         unsigned int off = *offset;
1113         struct bio *chain = NULL;
1114         struct bio **end;
1115
1116         /* Build up a chain of clone bios up to the limit */
1117
1118         if (!bi || off >= bi->bi_size || !len)
1119                 return NULL;            /* Nothing to clone */
1120
1121         end = &chain;
1122         while (len) {
1123                 unsigned int bi_size;
1124                 struct bio *bio;
1125
1126                 if (!bi) {
1127                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1128                         goto out_err;   /* EINVAL; ran out of bio's */
1129                 }
1130                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1131                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1132                 if (!bio)
1133                         goto out_err;   /* ENOMEM */
1134
1135                 *end = bio;
1136                 end = &bio->bi_next;
1137
1138                 off += bi_size;
1139                 if (off == bi->bi_size) {
1140                         bi = bi->bi_next;
1141                         off = 0;
1142                 }
1143                 len -= bi_size;
1144         }
1145         *bio_src = bi;
1146         *offset = off;
1147
1148         return chain;
1149 out_err:
1150         bio_chain_put(chain);
1151
1152         return NULL;
1153 }
1154
1155 /*
1156  * The default/initial value for all object request flags is 0.  For
1157  * each flag, once its value is set to 1 it is never reset to 0
1158  * again.
1159  */
1160 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1161 {
1162         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1163                 struct rbd_device *rbd_dev;
1164
1165                 rbd_dev = obj_request->img_request->rbd_dev;
1166                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1167                         obj_request);
1168         }
1169 }
1170
1171 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1172 {
1173         smp_mb();
1174         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1175 }
1176
1177 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1178 {
1179         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1180                 struct rbd_device *rbd_dev = NULL;
1181
1182                 if (obj_request_img_data_test(obj_request))
1183                         rbd_dev = obj_request->img_request->rbd_dev;
1184                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1185                         obj_request);
1186         }
1187 }
1188
1189 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1190 {
1191         smp_mb();
1192         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1193 }
1194
1195 /*
1196  * This sets the KNOWN flag after (possibly) setting the EXISTS
1197  * flag.  The latter is set based on the "exists" value provided.
1198  *
1199  * Note that for our purposes once an object exists it never goes
1200  * away again.  It's possible that the response from two existence
1201  * checks are separated by the creation of the target object, and
1202  * the first ("doesn't exist") response arrives *after* the second
1203  * ("does exist").  In that case we ignore the second one.
1204  */
1205 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1206                                 bool exists)
1207 {
1208         if (exists)
1209                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1210         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1211         smp_mb();
1212 }
1213
1214 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1215 {
1216         smp_mb();
1217         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1218 }
1219
1220 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1221 {
1222         smp_mb();
1223         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1224 }
1225
1226 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1227 {
1228         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1229                 atomic_read(&obj_request->kref.refcount));
1230         kref_get(&obj_request->kref);
1231 }
1232
1233 static void rbd_obj_request_destroy(struct kref *kref);
1234 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1235 {
1236         rbd_assert(obj_request != NULL);
1237         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1238                 atomic_read(&obj_request->kref.refcount));
1239         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1240 }
1241
1242 static void rbd_img_request_get(struct rbd_img_request *img_request)
1243 {
1244         dout("%s: img %p (was %d)\n", __func__, img_request,
1245                 atomic_read(&img_request->kref.refcount));
1246         kref_get(&img_request->kref);
1247 }
1248
1249 static void rbd_img_request_destroy(struct kref *kref);
1250 static void rbd_img_request_put(struct rbd_img_request *img_request)
1251 {
1252         rbd_assert(img_request != NULL);
1253         dout("%s: img %p (was %d)\n", __func__, img_request,
1254                 atomic_read(&img_request->kref.refcount));
1255         kref_put(&img_request->kref, rbd_img_request_destroy);
1256 }
1257
1258 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1259                                         struct rbd_obj_request *obj_request)
1260 {
1261         rbd_assert(obj_request->img_request == NULL);
1262
1263         /* Image request now owns object's original reference */
1264         obj_request->img_request = img_request;
1265         obj_request->which = img_request->obj_request_count;
1266         rbd_assert(!obj_request_img_data_test(obj_request));
1267         obj_request_img_data_set(obj_request);
1268         rbd_assert(obj_request->which != BAD_WHICH);
1269         img_request->obj_request_count++;
1270         list_add_tail(&obj_request->links, &img_request->obj_requests);
1271         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1272                 obj_request->which);
1273 }
1274
1275 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1276                                         struct rbd_obj_request *obj_request)
1277 {
1278         rbd_assert(obj_request->which != BAD_WHICH);
1279
1280         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1281                 obj_request->which);
1282         list_del(&obj_request->links);
1283         rbd_assert(img_request->obj_request_count > 0);
1284         img_request->obj_request_count--;
1285         rbd_assert(obj_request->which == img_request->obj_request_count);
1286         obj_request->which = BAD_WHICH;
1287         rbd_assert(obj_request_img_data_test(obj_request));
1288         rbd_assert(obj_request->img_request == img_request);
1289         obj_request->img_request = NULL;
1290         obj_request->callback = NULL;
1291         rbd_obj_request_put(obj_request);
1292 }
1293
1294 static bool obj_request_type_valid(enum obj_request_type type)
1295 {
1296         switch (type) {
1297         case OBJ_REQUEST_NODATA:
1298         case OBJ_REQUEST_BIO:
1299         case OBJ_REQUEST_PAGES:
1300                 return true;
1301         default:
1302                 return false;
1303         }
1304 }
1305
1306 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1307                                 struct rbd_obj_request *obj_request)
1308 {
1309         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1310
1311         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1312 }
1313
1314 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1315 {
1316
1317         dout("%s: img %p\n", __func__, img_request);
1318
1319         /*
1320          * If no error occurred, compute the aggregate transfer
1321          * count for the image request.  We could instead use
1322          * atomic64_cmpxchg() to update it as each object request
1323          * completes; not clear which way is better off hand.
1324          */
1325         if (!img_request->result) {
1326                 struct rbd_obj_request *obj_request;
1327                 u64 xferred = 0;
1328
1329                 for_each_obj_request(img_request, obj_request)
1330                         xferred += obj_request->xferred;
1331                 img_request->xferred = xferred;
1332         }
1333
1334         if (img_request->callback)
1335                 img_request->callback(img_request);
1336         else
1337                 rbd_img_request_put(img_request);
1338 }
1339
1340 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1341
1342 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1343 {
1344         dout("%s: obj %p\n", __func__, obj_request);
1345
1346         return wait_for_completion_interruptible(&obj_request->completion);
1347 }
1348
1349 /*
1350  * The default/initial value for all image request flags is 0.  Each
1351  * is conditionally set to 1 at image request initialization time
1352  * and currently never change thereafter.
1353  */
1354 static void img_request_write_set(struct rbd_img_request *img_request)
1355 {
1356         set_bit(IMG_REQ_WRITE, &img_request->flags);
1357         smp_mb();
1358 }
1359
1360 static bool img_request_write_test(struct rbd_img_request *img_request)
1361 {
1362         smp_mb();
1363         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1364 }
1365
1366 static void img_request_child_set(struct rbd_img_request *img_request)
1367 {
1368         set_bit(IMG_REQ_CHILD, &img_request->flags);
1369         smp_mb();
1370 }
1371
1372 static bool img_request_child_test(struct rbd_img_request *img_request)
1373 {
1374         smp_mb();
1375         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1376 }
1377
1378 static void img_request_layered_set(struct rbd_img_request *img_request)
1379 {
1380         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1381         smp_mb();
1382 }
1383
1384 static bool img_request_layered_test(struct rbd_img_request *img_request)
1385 {
1386         smp_mb();
1387         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1388 }
1389
1390 static void
1391 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1392 {
1393         u64 xferred = obj_request->xferred;
1394         u64 length = obj_request->length;
1395
1396         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1397                 obj_request, obj_request->img_request, obj_request->result,
1398                 xferred, length);
1399         /*
1400          * ENOENT means a hole in the image.  We zero-fill the
1401          * entire length of the request.  A short read also implies
1402          * zero-fill to the end of the request.  Either way we
1403          * update the xferred count to indicate the whole request
1404          * was satisfied.
1405          */
1406         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1407         if (obj_request->result == -ENOENT) {
1408                 if (obj_request->type == OBJ_REQUEST_BIO)
1409                         zero_bio_chain(obj_request->bio_list, 0);
1410                 else
1411                         zero_pages(obj_request->pages, 0, length);
1412                 obj_request->result = 0;
1413                 obj_request->xferred = length;
1414         } else if (xferred < length && !obj_request->result) {
1415                 if (obj_request->type == OBJ_REQUEST_BIO)
1416                         zero_bio_chain(obj_request->bio_list, xferred);
1417                 else
1418                         zero_pages(obj_request->pages, xferred, length);
1419                 obj_request->xferred = length;
1420         }
1421         obj_request_done_set(obj_request);
1422 }
1423
1424 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1425 {
1426         dout("%s: obj %p cb %p\n", __func__, obj_request,
1427                 obj_request->callback);
1428         if (obj_request->callback)
1429                 obj_request->callback(obj_request);
1430         else
1431                 complete_all(&obj_request->completion);
1432 }
1433
1434 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1435 {
1436         dout("%s: obj %p\n", __func__, obj_request);
1437         obj_request_done_set(obj_request);
1438 }
1439
1440 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1441 {
1442         struct rbd_img_request *img_request = NULL;
1443         struct rbd_device *rbd_dev = NULL;
1444         bool layered = false;
1445
1446         if (obj_request_img_data_test(obj_request)) {
1447                 img_request = obj_request->img_request;
1448                 layered = img_request && img_request_layered_test(img_request);
1449                 rbd_dev = img_request->rbd_dev;
1450         }
1451
1452         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1453                 obj_request, img_request, obj_request->result,
1454                 obj_request->xferred, obj_request->length);
1455         if (layered && obj_request->result == -ENOENT &&
1456                         obj_request->img_offset < rbd_dev->parent_overlap)
1457                 rbd_img_parent_read(obj_request);
1458         else if (img_request)
1459                 rbd_img_obj_request_read_callback(obj_request);
1460         else
1461                 obj_request_done_set(obj_request);
1462 }
1463
1464 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1465 {
1466         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1467                 obj_request->result, obj_request->length);
1468         /*
1469          * There is no such thing as a successful short write.  Set
1470          * it to our originally-requested length.
1471          */
1472         obj_request->xferred = obj_request->length;
1473         obj_request_done_set(obj_request);
1474 }
1475
1476 /*
1477  * For a simple stat call there's nothing to do.  We'll do more if
1478  * this is part of a write sequence for a layered image.
1479  */
1480 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1481 {
1482         dout("%s: obj %p\n", __func__, obj_request);
1483         obj_request_done_set(obj_request);
1484 }
1485
1486 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1487                                 struct ceph_msg *msg)
1488 {
1489         struct rbd_obj_request *obj_request = osd_req->r_priv;
1490         u16 opcode;
1491
1492         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1493         rbd_assert(osd_req == obj_request->osd_req);
1494         if (obj_request_img_data_test(obj_request)) {
1495                 rbd_assert(obj_request->img_request);
1496                 rbd_assert(obj_request->which != BAD_WHICH);
1497         } else {
1498                 rbd_assert(obj_request->which == BAD_WHICH);
1499         }
1500
1501         if (osd_req->r_result < 0)
1502                 obj_request->result = osd_req->r_result;
1503         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1504
1505         BUG_ON(osd_req->r_num_ops > 2);
1506
1507         /*
1508          * We support a 64-bit length, but ultimately it has to be
1509          * passed to blk_end_request(), which takes an unsigned int.
1510          */
1511         obj_request->xferred = osd_req->r_reply_op_len[0];
1512         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1513         opcode = osd_req->r_ops[0].op;
1514         switch (opcode) {
1515         case CEPH_OSD_OP_READ:
1516                 rbd_osd_read_callback(obj_request);
1517                 break;
1518         case CEPH_OSD_OP_WRITE:
1519                 rbd_osd_write_callback(obj_request);
1520                 break;
1521         case CEPH_OSD_OP_STAT:
1522                 rbd_osd_stat_callback(obj_request);
1523                 break;
1524         case CEPH_OSD_OP_CALL:
1525         case CEPH_OSD_OP_NOTIFY_ACK:
1526         case CEPH_OSD_OP_WATCH:
1527                 rbd_osd_trivial_callback(obj_request);
1528                 break;
1529         default:
1530                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1531                         obj_request->object_name, (unsigned short) opcode);
1532                 break;
1533         }
1534
1535         if (obj_request_done_test(obj_request))
1536                 rbd_obj_request_complete(obj_request);
1537 }
1538
1539 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1540 {
1541         struct rbd_img_request *img_request = obj_request->img_request;
1542         struct ceph_osd_request *osd_req = obj_request->osd_req;
1543         u64 snap_id;
1544
1545         rbd_assert(osd_req != NULL);
1546
1547         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1548         ceph_osdc_build_request(osd_req, obj_request->offset,
1549                         NULL, snap_id, NULL);
1550 }
1551
1552 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1553 {
1554         struct rbd_img_request *img_request = obj_request->img_request;
1555         struct ceph_osd_request *osd_req = obj_request->osd_req;
1556         struct ceph_snap_context *snapc;
1557         struct timespec mtime = CURRENT_TIME;
1558
1559         rbd_assert(osd_req != NULL);
1560
1561         snapc = img_request ? img_request->snapc : NULL;
1562         ceph_osdc_build_request(osd_req, obj_request->offset,
1563                         snapc, CEPH_NOSNAP, &mtime);
1564 }
1565
1566 static struct ceph_osd_request *rbd_osd_req_create(
1567                                         struct rbd_device *rbd_dev,
1568                                         bool write_request,
1569                                         struct rbd_obj_request *obj_request)
1570 {
1571         struct ceph_snap_context *snapc = NULL;
1572         struct ceph_osd_client *osdc;
1573         struct ceph_osd_request *osd_req;
1574
1575         if (obj_request_img_data_test(obj_request)) {
1576                 struct rbd_img_request *img_request = obj_request->img_request;
1577
1578                 rbd_assert(write_request ==
1579                                 img_request_write_test(img_request));
1580                 if (write_request)
1581                         snapc = img_request->snapc;
1582         }
1583
1584         /* Allocate and initialize the request, for the single op */
1585
1586         osdc = &rbd_dev->rbd_client->client->osdc;
1587         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1588         if (!osd_req)
1589                 return NULL;    /* ENOMEM */
1590
1591         if (write_request)
1592                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1593         else
1594                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1595
1596         osd_req->r_callback = rbd_osd_req_callback;
1597         osd_req->r_priv = obj_request;
1598
1599         osd_req->r_oid_len = strlen(obj_request->object_name);
1600         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1601         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1602
1603         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1604
1605         return osd_req;
1606 }
1607
1608 /*
1609  * Create a copyup osd request based on the information in the
1610  * object request supplied.  A copyup request has two osd ops,
1611  * a copyup method call, and a "normal" write request.
1612  */
1613 static struct ceph_osd_request *
1614 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1615 {
1616         struct rbd_img_request *img_request;
1617         struct ceph_snap_context *snapc;
1618         struct rbd_device *rbd_dev;
1619         struct ceph_osd_client *osdc;
1620         struct ceph_osd_request *osd_req;
1621
1622         rbd_assert(obj_request_img_data_test(obj_request));
1623         img_request = obj_request->img_request;
1624         rbd_assert(img_request);
1625         rbd_assert(img_request_write_test(img_request));
1626
1627         /* Allocate and initialize the request, for the two ops */
1628
1629         snapc = img_request->snapc;
1630         rbd_dev = img_request->rbd_dev;
1631         osdc = &rbd_dev->rbd_client->client->osdc;
1632         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1633         if (!osd_req)
1634                 return NULL;    /* ENOMEM */
1635
1636         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1637         osd_req->r_callback = rbd_osd_req_callback;
1638         osd_req->r_priv = obj_request;
1639
1640         osd_req->r_oid_len = strlen(obj_request->object_name);
1641         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1642         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1643
1644         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1645
1646         return osd_req;
1647 }
1648
1649
1650 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1651 {
1652         ceph_osdc_put_request(osd_req);
1653 }
1654
1655 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1656
1657 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1658                                                 u64 offset, u64 length,
1659                                                 enum obj_request_type type)
1660 {
1661         struct rbd_obj_request *obj_request;
1662         size_t size;
1663         char *name;
1664
1665         rbd_assert(obj_request_type_valid(type));
1666
1667         size = strlen(object_name) + 1;
1668         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1669         if (!obj_request)
1670                 return NULL;
1671
1672         name = (char *)(obj_request + 1);
1673         obj_request->object_name = memcpy(name, object_name, size);
1674         obj_request->offset = offset;
1675         obj_request->length = length;
1676         obj_request->flags = 0;
1677         obj_request->which = BAD_WHICH;
1678         obj_request->type = type;
1679         INIT_LIST_HEAD(&obj_request->links);
1680         init_completion(&obj_request->completion);
1681         kref_init(&obj_request->kref);
1682
1683         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1684                 offset, length, (int)type, obj_request);
1685
1686         return obj_request;
1687 }
1688
1689 static void rbd_obj_request_destroy(struct kref *kref)
1690 {
1691         struct rbd_obj_request *obj_request;
1692
1693         obj_request = container_of(kref, struct rbd_obj_request, kref);
1694
1695         dout("%s: obj %p\n", __func__, obj_request);
1696
1697         rbd_assert(obj_request->img_request == NULL);
1698         rbd_assert(obj_request->which == BAD_WHICH);
1699
1700         if (obj_request->osd_req)
1701                 rbd_osd_req_destroy(obj_request->osd_req);
1702
1703         rbd_assert(obj_request_type_valid(obj_request->type));
1704         switch (obj_request->type) {
1705         case OBJ_REQUEST_NODATA:
1706                 break;          /* Nothing to do */
1707         case OBJ_REQUEST_BIO:
1708                 if (obj_request->bio_list)
1709                         bio_chain_put(obj_request->bio_list);
1710                 break;
1711         case OBJ_REQUEST_PAGES:
1712                 if (obj_request->pages)
1713                         ceph_release_page_vector(obj_request->pages,
1714                                                 obj_request->page_count);
1715                 break;
1716         }
1717
1718         kfree(obj_request);
1719 }
1720
1721 /*
1722  * Caller is responsible for filling in the list of object requests
1723  * that comprises the image request, and the Linux request pointer
1724  * (if there is one).
1725  */
1726 static struct rbd_img_request *rbd_img_request_create(
1727                                         struct rbd_device *rbd_dev,
1728                                         u64 offset, u64 length,
1729                                         bool write_request,
1730                                         bool child_request)
1731 {
1732         struct rbd_img_request *img_request;
1733         struct ceph_snap_context *snapc = NULL;
1734
1735         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1736         if (!img_request)
1737                 return NULL;
1738
1739         if (write_request) {
1740                 down_read(&rbd_dev->header_rwsem);
1741                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1742                 up_read(&rbd_dev->header_rwsem);
1743                 if (WARN_ON(!snapc)) {
1744                         kfree(img_request);
1745                         return NULL;    /* Shouldn't happen */
1746                 }
1747
1748         }
1749
1750         img_request->rq = NULL;
1751         img_request->rbd_dev = rbd_dev;
1752         img_request->offset = offset;
1753         img_request->length = length;
1754         img_request->flags = 0;
1755         if (write_request) {
1756                 img_request_write_set(img_request);
1757                 img_request->snapc = snapc;
1758         } else {
1759                 img_request->snap_id = rbd_dev->spec->snap_id;
1760         }
1761         if (child_request)
1762                 img_request_child_set(img_request);
1763         if (rbd_dev->parent_spec)
1764                 img_request_layered_set(img_request);
1765         spin_lock_init(&img_request->completion_lock);
1766         img_request->next_completion = 0;
1767         img_request->callback = NULL;
1768         img_request->result = 0;
1769         img_request->obj_request_count = 0;
1770         INIT_LIST_HEAD(&img_request->obj_requests);
1771         kref_init(&img_request->kref);
1772
1773         rbd_img_request_get(img_request);       /* Avoid a warning */
1774         rbd_img_request_put(img_request);       /* TEMPORARY */
1775
1776         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1777                 write_request ? "write" : "read", offset, length,
1778                 img_request);
1779
1780         return img_request;
1781 }
1782
1783 static void rbd_img_request_destroy(struct kref *kref)
1784 {
1785         struct rbd_img_request *img_request;
1786         struct rbd_obj_request *obj_request;
1787         struct rbd_obj_request *next_obj_request;
1788
1789         img_request = container_of(kref, struct rbd_img_request, kref);
1790
1791         dout("%s: img %p\n", __func__, img_request);
1792
1793         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1794                 rbd_img_obj_request_del(img_request, obj_request);
1795         rbd_assert(img_request->obj_request_count == 0);
1796
1797         if (img_request_write_test(img_request))
1798                 ceph_put_snap_context(img_request->snapc);
1799
1800         if (img_request_child_test(img_request))
1801                 rbd_obj_request_put(img_request->obj_request);
1802
1803         kfree(img_request);
1804 }
1805
1806 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1807 {
1808         struct rbd_img_request *img_request;
1809         unsigned int xferred;
1810         int result;
1811         bool more;
1812
1813         rbd_assert(obj_request_img_data_test(obj_request));
1814         img_request = obj_request->img_request;
1815
1816         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1817         xferred = (unsigned int)obj_request->xferred;
1818         result = obj_request->result;
1819         if (result) {
1820                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1821
1822                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1823                         img_request_write_test(img_request) ? "write" : "read",
1824                         obj_request->length, obj_request->img_offset,
1825                         obj_request->offset);
1826                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1827                         result, xferred);
1828                 if (!img_request->result)
1829                         img_request->result = result;
1830         }
1831
1832         /* Image object requests don't own their page array */
1833
1834         if (obj_request->type == OBJ_REQUEST_PAGES) {
1835                 obj_request->pages = NULL;
1836                 obj_request->page_count = 0;
1837         }
1838
1839         if (img_request_child_test(img_request)) {
1840                 rbd_assert(img_request->obj_request != NULL);
1841                 more = obj_request->which < img_request->obj_request_count - 1;
1842         } else {
1843                 rbd_assert(img_request->rq != NULL);
1844                 more = blk_end_request(img_request->rq, result, xferred);
1845         }
1846
1847         return more;
1848 }
1849
1850 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1851 {
1852         struct rbd_img_request *img_request;
1853         u32 which = obj_request->which;
1854         bool more = true;
1855
1856         rbd_assert(obj_request_img_data_test(obj_request));
1857         img_request = obj_request->img_request;
1858
1859         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1860         rbd_assert(img_request != NULL);
1861         rbd_assert(img_request->obj_request_count > 0);
1862         rbd_assert(which != BAD_WHICH);
1863         rbd_assert(which < img_request->obj_request_count);
1864         rbd_assert(which >= img_request->next_completion);
1865
1866         spin_lock_irq(&img_request->completion_lock);
1867         if (which != img_request->next_completion)
1868                 goto out;
1869
1870         for_each_obj_request_from(img_request, obj_request) {
1871                 rbd_assert(more);
1872                 rbd_assert(which < img_request->obj_request_count);
1873
1874                 if (!obj_request_done_test(obj_request))
1875                         break;
1876                 more = rbd_img_obj_end_request(obj_request);
1877                 which++;
1878         }
1879
1880         rbd_assert(more ^ (which == img_request->obj_request_count));
1881         img_request->next_completion = which;
1882 out:
1883         spin_unlock_irq(&img_request->completion_lock);
1884
1885         if (!more)
1886                 rbd_img_request_complete(img_request);
1887 }
1888
1889 /*
1890  * Split up an image request into one or more object requests, each
1891  * to a different object.  The "type" parameter indicates whether
1892  * "data_desc" is the pointer to the head of a list of bio
1893  * structures, or the base of a page array.  In either case this
1894  * function assumes data_desc describes memory sufficient to hold
1895  * all data described by the image request.
1896  */
1897 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1898                                         enum obj_request_type type,
1899                                         void *data_desc)
1900 {
1901         struct rbd_device *rbd_dev = img_request->rbd_dev;
1902         struct rbd_obj_request *obj_request = NULL;
1903         struct rbd_obj_request *next_obj_request;
1904         bool write_request = img_request_write_test(img_request);
1905         struct bio *bio_list;
1906         unsigned int bio_offset = 0;
1907         struct page **pages;
1908         u64 img_offset;
1909         u64 resid;
1910         u16 opcode;
1911
1912         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1913                 (int)type, data_desc);
1914
1915         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1916         img_offset = img_request->offset;
1917         resid = img_request->length;
1918         rbd_assert(resid > 0);
1919
1920         if (type == OBJ_REQUEST_BIO) {
1921                 bio_list = data_desc;
1922                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1923         } else {
1924                 rbd_assert(type == OBJ_REQUEST_PAGES);
1925                 pages = data_desc;
1926         }
1927
1928         while (resid) {
1929                 struct ceph_osd_request *osd_req;
1930                 const char *object_name;
1931                 u64 offset;
1932                 u64 length;
1933
1934                 object_name = rbd_segment_name(rbd_dev, img_offset);
1935                 if (!object_name)
1936                         goto out_unwind;
1937                 offset = rbd_segment_offset(rbd_dev, img_offset);
1938                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1939                 obj_request = rbd_obj_request_create(object_name,
1940                                                 offset, length, type);
1941                 kfree(object_name);     /* object request has its own copy */
1942                 if (!obj_request)
1943                         goto out_unwind;
1944
1945                 if (type == OBJ_REQUEST_BIO) {
1946                         unsigned int clone_size;
1947
1948                         rbd_assert(length <= (u64)UINT_MAX);
1949                         clone_size = (unsigned int)length;
1950                         obj_request->bio_list =
1951                                         bio_chain_clone_range(&bio_list,
1952                                                                 &bio_offset,
1953                                                                 clone_size,
1954                                                                 GFP_ATOMIC);
1955                         if (!obj_request->bio_list)
1956                                 goto out_partial;
1957                 } else {
1958                         unsigned int page_count;
1959
1960                         obj_request->pages = pages;
1961                         page_count = (u32)calc_pages_for(offset, length);
1962                         obj_request->page_count = page_count;
1963                         if ((offset + length) & ~PAGE_MASK)
1964                                 page_count--;   /* more on last page */
1965                         pages += page_count;
1966                 }
1967
1968                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1969                                                 obj_request);
1970                 if (!osd_req)
1971                         goto out_partial;
1972                 obj_request->osd_req = osd_req;
1973                 obj_request->callback = rbd_img_obj_callback;
1974
1975                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1976                                                 0, 0);
1977                 if (type == OBJ_REQUEST_BIO)
1978                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1979                                         obj_request->bio_list, length);
1980                 else
1981                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1982                                         obj_request->pages, length,
1983                                         offset & ~PAGE_MASK, false, false);
1984
1985                 if (write_request)
1986                         rbd_osd_req_format_write(obj_request);
1987                 else
1988                         rbd_osd_req_format_read(obj_request);
1989
1990                 obj_request->img_offset = img_offset;
1991                 rbd_img_obj_request_add(img_request, obj_request);
1992
1993                 img_offset += length;
1994                 resid -= length;
1995         }
1996
1997         return 0;
1998
1999 out_partial:
2000         rbd_obj_request_put(obj_request);
2001 out_unwind:
2002         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2003                 rbd_obj_request_put(obj_request);
2004
2005         return -ENOMEM;
2006 }
2007
2008 static void
2009 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2010 {
2011         struct rbd_img_request *img_request;
2012         struct rbd_device *rbd_dev;
2013         u64 length;
2014         u32 page_count;
2015
2016         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2017         rbd_assert(obj_request_img_data_test(obj_request));
2018         img_request = obj_request->img_request;
2019         rbd_assert(img_request);
2020
2021         rbd_dev = img_request->rbd_dev;
2022         rbd_assert(rbd_dev);
2023         length = (u64)1 << rbd_dev->header.obj_order;
2024         page_count = (u32)calc_pages_for(0, length);
2025
2026         rbd_assert(obj_request->copyup_pages);
2027         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2028         obj_request->copyup_pages = NULL;
2029
2030         /*
2031          * We want the transfer count to reflect the size of the
2032          * original write request.  There is no such thing as a
2033          * successful short write, so if the request was successful
2034          * we can just set it to the originally-requested length.
2035          */
2036         if (!obj_request->result)
2037                 obj_request->xferred = obj_request->length;
2038
2039         /* Finish up with the normal image object callback */
2040
2041         rbd_img_obj_callback(obj_request);
2042 }
2043
2044 static void
2045 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2046 {
2047         struct rbd_obj_request *orig_request;
2048         struct ceph_osd_request *osd_req;
2049         struct ceph_osd_client *osdc;
2050         struct rbd_device *rbd_dev;
2051         struct page **pages;
2052         int result;
2053         u64 obj_size;
2054         u64 xferred;
2055
2056         rbd_assert(img_request_child_test(img_request));
2057
2058         /* First get what we need from the image request */
2059
2060         pages = img_request->copyup_pages;
2061         rbd_assert(pages != NULL);
2062         img_request->copyup_pages = NULL;
2063
2064         orig_request = img_request->obj_request;
2065         rbd_assert(orig_request != NULL);
2066         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2067         result = img_request->result;
2068         obj_size = img_request->length;
2069         xferred = img_request->xferred;
2070
2071         rbd_dev = img_request->rbd_dev;
2072         rbd_assert(rbd_dev);
2073         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2074
2075         rbd_img_request_put(img_request);
2076
2077         if (result)
2078                 goto out_err;
2079
2080         /* Allocate the new copyup osd request for the original request */
2081
2082         result = -ENOMEM;
2083         rbd_assert(!orig_request->osd_req);
2084         osd_req = rbd_osd_req_create_copyup(orig_request);
2085         if (!osd_req)
2086                 goto out_err;
2087         orig_request->osd_req = osd_req;
2088         orig_request->copyup_pages = pages;
2089
2090         /* Initialize the copyup op */
2091
2092         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2093         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2094                                                 false, false);
2095
2096         /* Then the original write request op */
2097
2098         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2099                                         orig_request->offset,
2100                                         orig_request->length, 0, 0);
2101         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2102                                         orig_request->length);
2103
2104         rbd_osd_req_format_write(orig_request);
2105
2106         /* All set, send it off. */
2107
2108         orig_request->callback = rbd_img_obj_copyup_callback;
2109         osdc = &rbd_dev->rbd_client->client->osdc;
2110         result = rbd_obj_request_submit(osdc, orig_request);
2111         if (!result)
2112                 return;
2113 out_err:
2114         /* Record the error code and complete the request */
2115
2116         orig_request->result = result;
2117         orig_request->xferred = 0;
2118         obj_request_done_set(orig_request);
2119         rbd_obj_request_complete(orig_request);
2120 }
2121
2122 /*
2123  * Read from the parent image the range of data that covers the
2124  * entire target of the given object request.  This is used for
2125  * satisfying a layered image write request when the target of an
2126  * object request from the image request does not exist.
2127  *
2128  * A page array big enough to hold the returned data is allocated
2129  * and supplied to rbd_img_request_fill() as the "data descriptor."
2130  * When the read completes, this page array will be transferred to
2131  * the original object request for the copyup operation.
2132  *
2133  * If an error occurs, record it as the result of the original
2134  * object request and mark it done so it gets completed.
2135  */
2136 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2137 {
2138         struct rbd_img_request *img_request = NULL;
2139         struct rbd_img_request *parent_request = NULL;
2140         struct rbd_device *rbd_dev;
2141         u64 img_offset;
2142         u64 length;
2143         struct page **pages = NULL;
2144         u32 page_count;
2145         int result;
2146
2147         rbd_assert(obj_request_img_data_test(obj_request));
2148         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2149
2150         img_request = obj_request->img_request;
2151         rbd_assert(img_request != NULL);
2152         rbd_dev = img_request->rbd_dev;
2153         rbd_assert(rbd_dev->parent != NULL);
2154
2155         /*
2156          * First things first.  The original osd request is of no
2157          * use to use any more, we'll need a new one that can hold
2158          * the two ops in a copyup request.  We'll get that later,
2159          * but for now we can release the old one.
2160          */
2161         rbd_osd_req_destroy(obj_request->osd_req);
2162         obj_request->osd_req = NULL;
2163
2164         /*
2165          * Determine the byte range covered by the object in the
2166          * child image to which the original request was to be sent.
2167          */
2168         img_offset = obj_request->img_offset - obj_request->offset;
2169         length = (u64)1 << rbd_dev->header.obj_order;
2170
2171         /*
2172          * There is no defined parent data beyond the parent
2173          * overlap, so limit what we read at that boundary if
2174          * necessary.
2175          */
2176         if (img_offset + length > rbd_dev->parent_overlap) {
2177                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2178                 length = rbd_dev->parent_overlap - img_offset;
2179         }
2180
2181         /*
2182          * Allocate a page array big enough to receive the data read
2183          * from the parent.
2184          */
2185         page_count = (u32)calc_pages_for(0, length);
2186         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2187         if (IS_ERR(pages)) {
2188                 result = PTR_ERR(pages);
2189                 pages = NULL;
2190                 goto out_err;
2191         }
2192
2193         result = -ENOMEM;
2194         parent_request = rbd_img_request_create(rbd_dev->parent,
2195                                                 img_offset, length,
2196                                                 false, true);
2197         if (!parent_request)
2198                 goto out_err;
2199         rbd_obj_request_get(obj_request);
2200         parent_request->obj_request = obj_request;
2201
2202         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2203         if (result)
2204                 goto out_err;
2205         parent_request->copyup_pages = pages;
2206
2207         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2208         result = rbd_img_request_submit(parent_request);
2209         if (!result)
2210                 return 0;
2211
2212         parent_request->copyup_pages = NULL;
2213         parent_request->obj_request = NULL;
2214         rbd_obj_request_put(obj_request);
2215 out_err:
2216         if (pages)
2217                 ceph_release_page_vector(pages, page_count);
2218         if (parent_request)
2219                 rbd_img_request_put(parent_request);
2220         obj_request->result = result;
2221         obj_request->xferred = 0;
2222         obj_request_done_set(obj_request);
2223
2224         return result;
2225 }
2226
2227 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2228 {
2229         struct rbd_obj_request *orig_request;
2230         int result;
2231
2232         rbd_assert(!obj_request_img_data_test(obj_request));
2233
2234         /*
2235          * All we need from the object request is the original
2236          * request and the result of the STAT op.  Grab those, then
2237          * we're done with the request.
2238          */
2239         orig_request = obj_request->obj_request;
2240         obj_request->obj_request = NULL;
2241         rbd_assert(orig_request);
2242         rbd_assert(orig_request->img_request);
2243
2244         result = obj_request->result;
2245         obj_request->result = 0;
2246
2247         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2248                 obj_request, orig_request, result,
2249                 obj_request->xferred, obj_request->length);
2250         rbd_obj_request_put(obj_request);
2251
2252         rbd_assert(orig_request);
2253         rbd_assert(orig_request->img_request);
2254
2255         /*
2256          * Our only purpose here is to determine whether the object
2257          * exists, and we don't want to treat the non-existence as
2258          * an error.  If something else comes back, transfer the
2259          * error to the original request and complete it now.
2260          */
2261         if (!result) {
2262                 obj_request_existence_set(orig_request, true);
2263         } else if (result == -ENOENT) {
2264                 obj_request_existence_set(orig_request, false);
2265         } else if (result) {
2266                 orig_request->result = result;
2267                 goto out;
2268         }
2269
2270         /*
2271          * Resubmit the original request now that we have recorded
2272          * whether the target object exists.
2273          */
2274         orig_request->result = rbd_img_obj_request_submit(orig_request);
2275 out:
2276         if (orig_request->result)
2277                 rbd_obj_request_complete(orig_request);
2278         rbd_obj_request_put(orig_request);
2279 }
2280
2281 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2282 {
2283         struct rbd_obj_request *stat_request;
2284         struct rbd_device *rbd_dev;
2285         struct ceph_osd_client *osdc;
2286         struct page **pages = NULL;
2287         u32 page_count;
2288         size_t size;
2289         int ret;
2290
2291         /*
2292          * The response data for a STAT call consists of:
2293          *     le64 length;
2294          *     struct {
2295          *         le32 tv_sec;
2296          *         le32 tv_nsec;
2297          *     } mtime;
2298          */
2299         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2300         page_count = (u32)calc_pages_for(0, size);
2301         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2302         if (IS_ERR(pages))
2303                 return PTR_ERR(pages);
2304
2305         ret = -ENOMEM;
2306         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2307                                                         OBJ_REQUEST_PAGES);
2308         if (!stat_request)
2309                 goto out;
2310
2311         rbd_obj_request_get(obj_request);
2312         stat_request->obj_request = obj_request;
2313         stat_request->pages = pages;
2314         stat_request->page_count = page_count;
2315
2316         rbd_assert(obj_request->img_request);
2317         rbd_dev = obj_request->img_request->rbd_dev;
2318         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2319                                                 stat_request);
2320         if (!stat_request->osd_req)
2321                 goto out;
2322         stat_request->callback = rbd_img_obj_exists_callback;
2323
2324         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2325         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2326                                         false, false);
2327         rbd_osd_req_format_read(stat_request);
2328
2329         osdc = &rbd_dev->rbd_client->client->osdc;
2330         ret = rbd_obj_request_submit(osdc, stat_request);
2331 out:
2332         if (ret)
2333                 rbd_obj_request_put(obj_request);
2334
2335         return ret;
2336 }
2337
2338 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2339 {
2340         struct rbd_img_request *img_request;
2341         struct rbd_device *rbd_dev;
2342         bool known;
2343
2344         rbd_assert(obj_request_img_data_test(obj_request));
2345
2346         img_request = obj_request->img_request;
2347         rbd_assert(img_request);
2348         rbd_dev = img_request->rbd_dev;
2349
2350         /*
2351          * Only writes to layered images need special handling.
2352          * Reads and non-layered writes are simple object requests.
2353          * Layered writes that start beyond the end of the overlap
2354          * with the parent have no parent data, so they too are
2355          * simple object requests.  Finally, if the target object is
2356          * known to already exist, its parent data has already been
2357          * copied, so a write to the object can also be handled as a
2358          * simple object request.
2359          */
2360         if (!img_request_write_test(img_request) ||
2361                 !img_request_layered_test(img_request) ||
2362                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2363                 ((known = obj_request_known_test(obj_request)) &&
2364                         obj_request_exists_test(obj_request))) {
2365
2366                 struct rbd_device *rbd_dev;
2367                 struct ceph_osd_client *osdc;
2368
2369                 rbd_dev = obj_request->img_request->rbd_dev;
2370                 osdc = &rbd_dev->rbd_client->client->osdc;
2371
2372                 return rbd_obj_request_submit(osdc, obj_request);
2373         }
2374
2375         /*
2376          * It's a layered write.  The target object might exist but
2377          * we may not know that yet.  If we know it doesn't exist,
2378          * start by reading the data for the full target object from
2379          * the parent so we can use it for a copyup to the target.
2380          */
2381         if (known)
2382                 return rbd_img_obj_parent_read_full(obj_request);
2383
2384         /* We don't know whether the target exists.  Go find out. */
2385
2386         return rbd_img_obj_exists_submit(obj_request);
2387 }
2388
2389 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2390 {
2391         struct rbd_obj_request *obj_request;
2392         struct rbd_obj_request *next_obj_request;
2393
2394         dout("%s: img %p\n", __func__, img_request);
2395         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2396                 int ret;
2397
2398                 ret = rbd_img_obj_request_submit(obj_request);
2399                 if (ret)
2400                         return ret;
2401         }
2402
2403         return 0;
2404 }
2405
2406 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2407 {
2408         struct rbd_obj_request *obj_request;
2409         struct rbd_device *rbd_dev;
2410         u64 obj_end;
2411
2412         rbd_assert(img_request_child_test(img_request));
2413
2414         obj_request = img_request->obj_request;
2415         rbd_assert(obj_request);
2416         rbd_assert(obj_request->img_request);
2417
2418         obj_request->result = img_request->result;
2419         if (obj_request->result)
2420                 goto out;
2421
2422         /*
2423          * We need to zero anything beyond the parent overlap
2424          * boundary.  Since rbd_img_obj_request_read_callback()
2425          * will zero anything beyond the end of a short read, an
2426          * easy way to do this is to pretend the data from the
2427          * parent came up short--ending at the overlap boundary.
2428          */
2429         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2430         obj_end = obj_request->img_offset + obj_request->length;
2431         rbd_dev = obj_request->img_request->rbd_dev;
2432         if (obj_end > rbd_dev->parent_overlap) {
2433                 u64 xferred = 0;
2434
2435                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2436                         xferred = rbd_dev->parent_overlap -
2437                                         obj_request->img_offset;
2438
2439                 obj_request->xferred = min(img_request->xferred, xferred);
2440         } else {
2441                 obj_request->xferred = img_request->xferred;
2442         }
2443 out:
2444         rbd_img_obj_request_read_callback(obj_request);
2445         rbd_obj_request_complete(obj_request);
2446 }
2447
2448 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2449 {
2450         struct rbd_device *rbd_dev;
2451         struct rbd_img_request *img_request;
2452         int result;
2453
2454         rbd_assert(obj_request_img_data_test(obj_request));
2455         rbd_assert(obj_request->img_request != NULL);
2456         rbd_assert(obj_request->result == (s32) -ENOENT);
2457         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2458
2459         rbd_dev = obj_request->img_request->rbd_dev;
2460         rbd_assert(rbd_dev->parent != NULL);
2461         /* rbd_read_finish(obj_request, obj_request->length); */
2462         img_request = rbd_img_request_create(rbd_dev->parent,
2463                                                 obj_request->img_offset,
2464                                                 obj_request->length,
2465                                                 false, true);
2466         result = -ENOMEM;
2467         if (!img_request)
2468                 goto out_err;
2469
2470         rbd_obj_request_get(obj_request);
2471         img_request->obj_request = obj_request;
2472
2473         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2474                                         obj_request->bio_list);
2475         if (result)
2476                 goto out_err;
2477
2478         img_request->callback = rbd_img_parent_read_callback;
2479         result = rbd_img_request_submit(img_request);
2480         if (result)
2481                 goto out_err;
2482
2483         return;
2484 out_err:
2485         if (img_request)
2486                 rbd_img_request_put(img_request);
2487         obj_request->result = result;
2488         obj_request->xferred = 0;
2489         obj_request_done_set(obj_request);
2490 }
2491
2492 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2493                                    u64 ver, u64 notify_id)
2494 {
2495         struct rbd_obj_request *obj_request;
2496         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2497         int ret;
2498
2499         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2500                                                         OBJ_REQUEST_NODATA);
2501         if (!obj_request)
2502                 return -ENOMEM;
2503
2504         ret = -ENOMEM;
2505         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2506         if (!obj_request->osd_req)
2507                 goto out;
2508         obj_request->callback = rbd_obj_request_put;
2509
2510         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2511                                         notify_id, ver, 0);
2512         rbd_osd_req_format_read(obj_request);
2513
2514         ret = rbd_obj_request_submit(osdc, obj_request);
2515 out:
2516         if (ret)
2517                 rbd_obj_request_put(obj_request);
2518
2519         return ret;
2520 }
2521
2522 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2523 {
2524         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2525         u64 hver;
2526         int rc;
2527
2528         if (!rbd_dev)
2529                 return;
2530
2531         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2532                 rbd_dev->header_name, (unsigned long long) notify_id,
2533                 (unsigned int) opcode);
2534         rc = rbd_dev_refresh(rbd_dev, &hver);
2535         if (rc)
2536                 rbd_warn(rbd_dev, "got notification but failed to "
2537                            " update snaps: %d\n", rc);
2538
2539         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2540 }
2541
2542 /*
2543  * Request sync osd watch/unwatch.  The value of "start" determines
2544  * whether a watch request is being initiated or torn down.
2545  */
2546 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2547 {
2548         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2549         struct rbd_obj_request *obj_request;
2550         int ret;
2551
2552         rbd_assert(start ^ !!rbd_dev->watch_event);
2553         rbd_assert(start ^ !!rbd_dev->watch_request);
2554
2555         if (start) {
2556                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2557                                                 &rbd_dev->watch_event);
2558                 if (ret < 0)
2559                         return ret;
2560                 rbd_assert(rbd_dev->watch_event != NULL);
2561         }
2562
2563         ret = -ENOMEM;
2564         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2565                                                         OBJ_REQUEST_NODATA);
2566         if (!obj_request)
2567                 goto out_cancel;
2568
2569         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2570         if (!obj_request->osd_req)
2571                 goto out_cancel;
2572
2573         if (start)
2574                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2575         else
2576                 ceph_osdc_unregister_linger_request(osdc,
2577                                         rbd_dev->watch_request->osd_req);
2578
2579         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2580                                 rbd_dev->watch_event->cookie,
2581                                 rbd_dev->header.obj_version, start);
2582         rbd_osd_req_format_write(obj_request);
2583
2584         ret = rbd_obj_request_submit(osdc, obj_request);
2585         if (ret)
2586                 goto out_cancel;
2587         ret = rbd_obj_request_wait(obj_request);
2588         if (ret)
2589                 goto out_cancel;
2590         ret = obj_request->result;
2591         if (ret)
2592                 goto out_cancel;
2593
2594         /*
2595          * A watch request is set to linger, so the underlying osd
2596          * request won't go away until we unregister it.  We retain
2597          * a pointer to the object request during that time (in
2598          * rbd_dev->watch_request), so we'll keep a reference to
2599          * it.  We'll drop that reference (below) after we've
2600          * unregistered it.
2601          */
2602         if (start) {
2603                 rbd_dev->watch_request = obj_request;
2604
2605                 return 0;
2606         }
2607
2608         /* We have successfully torn down the watch request */
2609
2610         rbd_obj_request_put(rbd_dev->watch_request);
2611         rbd_dev->watch_request = NULL;
2612 out_cancel:
2613         /* Cancel the event if we're tearing down, or on error */
2614         ceph_osdc_cancel_event(rbd_dev->watch_event);
2615         rbd_dev->watch_event = NULL;
2616         if (obj_request)
2617                 rbd_obj_request_put(obj_request);
2618
2619         return ret;
2620 }
2621
2622 /*
2623  * Synchronous osd object method call
2624  */
2625 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2626                              const char *object_name,
2627                              const char *class_name,
2628                              const char *method_name,
2629                              const void *outbound,
2630                              size_t outbound_size,
2631                              void *inbound,
2632                              size_t inbound_size,
2633                              u64 *version)
2634 {
2635         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2636         struct rbd_obj_request *obj_request;
2637         struct page **pages;
2638         u32 page_count;
2639         int ret;
2640
2641         /*
2642          * Method calls are ultimately read operations.  The result
2643          * should placed into the inbound buffer provided.  They
2644          * also supply outbound data--parameters for the object
2645          * method.  Currently if this is present it will be a
2646          * snapshot id.
2647          */
2648         page_count = (u32)calc_pages_for(0, inbound_size);
2649         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2650         if (IS_ERR(pages))
2651                 return PTR_ERR(pages);
2652
2653         ret = -ENOMEM;
2654         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2655                                                         OBJ_REQUEST_PAGES);
2656         if (!obj_request)
2657                 goto out;
2658
2659         obj_request->pages = pages;
2660         obj_request->page_count = page_count;
2661
2662         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2663         if (!obj_request->osd_req)
2664                 goto out;
2665
2666         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2667                                         class_name, method_name);
2668         if (outbound_size) {
2669                 struct ceph_pagelist *pagelist;
2670
2671                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2672                 if (!pagelist)
2673                         goto out;
2674
2675                 ceph_pagelist_init(pagelist);
2676                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2677                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2678                                                 pagelist);
2679         }
2680         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2681                                         obj_request->pages, inbound_size,
2682                                         0, false, false);
2683         rbd_osd_req_format_read(obj_request);
2684
2685         ret = rbd_obj_request_submit(osdc, obj_request);
2686         if (ret)
2687                 goto out;
2688         ret = rbd_obj_request_wait(obj_request);
2689         if (ret)
2690                 goto out;
2691
2692         ret = obj_request->result;
2693         if (ret < 0)
2694                 goto out;
2695
2696         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2697         ret = (int)obj_request->xferred;
2698         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2699         if (version)
2700                 *version = obj_request->version;
2701 out:
2702         if (obj_request)
2703                 rbd_obj_request_put(obj_request);
2704         else
2705                 ceph_release_page_vector(pages, page_count);
2706
2707         return ret;
2708 }
2709
2710 static void rbd_request_fn(struct request_queue *q)
2711                 __releases(q->queue_lock) __acquires(q->queue_lock)
2712 {
2713         struct rbd_device *rbd_dev = q->queuedata;
2714         bool read_only = rbd_dev->mapping.read_only;
2715         struct request *rq;
2716         int result;
2717
2718         while ((rq = blk_fetch_request(q))) {
2719                 bool write_request = rq_data_dir(rq) == WRITE;
2720                 struct rbd_img_request *img_request;
2721                 u64 offset;
2722                 u64 length;
2723
2724                 /* Ignore any non-FS requests that filter through. */
2725
2726                 if (rq->cmd_type != REQ_TYPE_FS) {
2727                         dout("%s: non-fs request type %d\n", __func__,
2728                                 (int) rq->cmd_type);
2729                         __blk_end_request_all(rq, 0);
2730                         continue;
2731                 }
2732
2733                 /* Ignore/skip any zero-length requests */
2734
2735                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2736                 length = (u64) blk_rq_bytes(rq);
2737
2738                 if (!length) {
2739                         dout("%s: zero-length request\n", __func__);
2740                         __blk_end_request_all(rq, 0);
2741                         continue;
2742                 }
2743
2744                 spin_unlock_irq(q->queue_lock);
2745
2746                 /* Disallow writes to a read-only device */
2747
2748                 if (write_request) {
2749                         result = -EROFS;
2750                         if (read_only)
2751                                 goto end_request;
2752                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2753                 }
2754
2755                 /*
2756                  * Quit early if the mapped snapshot no longer
2757                  * exists.  It's still possible the snapshot will
2758                  * have disappeared by the time our request arrives
2759                  * at the osd, but there's no sense in sending it if
2760                  * we already know.
2761                  */
2762                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2763                         dout("request for non-existent snapshot");
2764                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2765                         result = -ENXIO;
2766                         goto end_request;
2767                 }
2768
2769                 result = -EINVAL;
2770                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2771                         goto end_request;       /* Shouldn't happen */
2772
2773                 result = -ENOMEM;
2774                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2775                                                         write_request, false);
2776                 if (!img_request)
2777                         goto end_request;
2778
2779                 img_request->rq = rq;
2780
2781                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2782                                                 rq->bio);
2783                 if (!result)
2784                         result = rbd_img_request_submit(img_request);
2785                 if (result)
2786                         rbd_img_request_put(img_request);
2787 end_request:
2788                 spin_lock_irq(q->queue_lock);
2789                 if (result < 0) {
2790                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2791                                 write_request ? "write" : "read",
2792                                 length, offset, result);
2793
2794                         __blk_end_request_all(rq, result);
2795                 }
2796         }
2797 }
2798
2799 /*
2800  * a queue callback. Makes sure that we don't create a bio that spans across
2801  * multiple osd objects. One exception would be with a single page bios,
2802  * which we handle later at bio_chain_clone_range()
2803  */
2804 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2805                           struct bio_vec *bvec)
2806 {
2807         struct rbd_device *rbd_dev = q->queuedata;
2808         sector_t sector_offset;
2809         sector_t sectors_per_obj;
2810         sector_t obj_sector_offset;
2811         int ret;
2812
2813         /*
2814          * Find how far into its rbd object the partition-relative
2815          * bio start sector is to offset relative to the enclosing
2816          * device.
2817          */
2818         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2819         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2820         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2821
2822         /*
2823          * Compute the number of bytes from that offset to the end
2824          * of the object.  Account for what's already used by the bio.
2825          */
2826         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2827         if (ret > bmd->bi_size)
2828                 ret -= bmd->bi_size;
2829         else
2830                 ret = 0;
2831
2832         /*
2833          * Don't send back more than was asked for.  And if the bio
2834          * was empty, let the whole thing through because:  "Note
2835          * that a block device *must* allow a single page to be
2836          * added to an empty bio."
2837          */
2838         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2839         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2840                 ret = (int) bvec->bv_len;
2841
2842         return ret;
2843 }
2844
2845 static void rbd_free_disk(struct rbd_device *rbd_dev)
2846 {
2847         struct gendisk *disk = rbd_dev->disk;
2848
2849         if (!disk)
2850                 return;
2851
2852         if (disk->flags & GENHD_FL_UP)
2853                 del_gendisk(disk);
2854         if (disk->queue)
2855                 blk_cleanup_queue(disk->queue);
2856         put_disk(disk);
2857 }
2858
2859 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2860                                 const char *object_name,
2861                                 u64 offset, u64 length,
2862                                 void *buf, u64 *version)
2863
2864 {
2865         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2866         struct rbd_obj_request *obj_request;
2867         struct page **pages = NULL;
2868         u32 page_count;
2869         size_t size;
2870         int ret;
2871
2872         page_count = (u32) calc_pages_for(offset, length);
2873         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2874         if (IS_ERR(pages))
2875                 ret = PTR_ERR(pages);
2876
2877         ret = -ENOMEM;
2878         obj_request = rbd_obj_request_create(object_name, offset, length,
2879                                                         OBJ_REQUEST_PAGES);
2880         if (!obj_request)
2881                 goto out;
2882
2883         obj_request->pages = pages;
2884         obj_request->page_count = page_count;
2885
2886         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2887         if (!obj_request->osd_req)
2888                 goto out;
2889
2890         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2891                                         offset, length, 0, 0);
2892         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2893                                         obj_request->pages,
2894                                         obj_request->length,
2895                                         obj_request->offset & ~PAGE_MASK,
2896                                         false, false);
2897         rbd_osd_req_format_read(obj_request);
2898
2899         ret = rbd_obj_request_submit(osdc, obj_request);
2900         if (ret)
2901                 goto out;
2902         ret = rbd_obj_request_wait(obj_request);
2903         if (ret)
2904                 goto out;
2905
2906         ret = obj_request->result;
2907         if (ret < 0)
2908                 goto out;
2909
2910         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2911         size = (size_t) obj_request->xferred;
2912         ceph_copy_from_page_vector(pages, buf, 0, size);
2913         rbd_assert(size <= (size_t) INT_MAX);
2914         ret = (int) size;
2915         if (version)
2916                 *version = obj_request->version;
2917 out:
2918         if (obj_request)
2919                 rbd_obj_request_put(obj_request);
2920         else
2921                 ceph_release_page_vector(pages, page_count);
2922
2923         return ret;
2924 }
2925
2926 /*
2927  * Read the complete header for the given rbd device.
2928  *
2929  * Returns a pointer to a dynamically-allocated buffer containing
2930  * the complete and validated header.  Caller can pass the address
2931  * of a variable that will be filled in with the version of the
2932  * header object at the time it was read.
2933  *
2934  * Returns a pointer-coded errno if a failure occurs.
2935  */
2936 static struct rbd_image_header_ondisk *
2937 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2938 {
2939         struct rbd_image_header_ondisk *ondisk = NULL;
2940         u32 snap_count = 0;
2941         u64 names_size = 0;
2942         u32 want_count;
2943         int ret;
2944
2945         /*
2946          * The complete header will include an array of its 64-bit
2947          * snapshot ids, followed by the names of those snapshots as
2948          * a contiguous block of NUL-terminated strings.  Note that
2949          * the number of snapshots could change by the time we read
2950          * it in, in which case we re-read it.
2951          */
2952         do {
2953                 size_t size;
2954
2955                 kfree(ondisk);
2956
2957                 size = sizeof (*ondisk);
2958                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2959                 size += names_size;
2960                 ondisk = kmalloc(size, GFP_KERNEL);
2961                 if (!ondisk)
2962                         return ERR_PTR(-ENOMEM);
2963
2964                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2965                                        0, size, ondisk, version);
2966                 if (ret < 0)
2967                         goto out_err;
2968                 if (WARN_ON((size_t) ret < size)) {
2969                         ret = -ENXIO;
2970                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2971                                 size, ret);
2972                         goto out_err;
2973                 }
2974                 if (!rbd_dev_ondisk_valid(ondisk)) {
2975                         ret = -ENXIO;
2976                         rbd_warn(rbd_dev, "invalid header");
2977                         goto out_err;
2978                 }
2979
2980                 names_size = le64_to_cpu(ondisk->snap_names_len);
2981                 want_count = snap_count;
2982                 snap_count = le32_to_cpu(ondisk->snap_count);
2983         } while (snap_count != want_count);
2984
2985         return ondisk;
2986
2987 out_err:
2988         kfree(ondisk);
2989
2990         return ERR_PTR(ret);
2991 }
2992
2993 /*
2994  * reload the ondisk the header
2995  */
2996 static int rbd_read_header(struct rbd_device *rbd_dev,
2997                            struct rbd_image_header *header)
2998 {
2999         struct rbd_image_header_ondisk *ondisk;
3000         u64 ver = 0;
3001         int ret;
3002
3003         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
3004         if (IS_ERR(ondisk))
3005                 return PTR_ERR(ondisk);
3006         ret = rbd_header_from_disk(header, ondisk);
3007         if (ret >= 0)
3008                 header->obj_version = ver;
3009         kfree(ondisk);
3010
3011         return ret;
3012 }
3013
3014 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3015 {
3016         struct rbd_snap *snap;
3017         struct rbd_snap *next;
3018
3019         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
3020                 rbd_remove_snap_dev(snap);
3021 }
3022
3023 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3024 {
3025         sector_t size;
3026
3027         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3028                 return;
3029
3030         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
3031         dout("setting size to %llu sectors", (unsigned long long) size);
3032         rbd_dev->mapping.size = (u64) size;
3033         set_capacity(rbd_dev->disk, size);
3034 }
3035
3036 /*
3037  * only read the first part of the ondisk header, without the snaps info
3038  */
3039 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3040 {
3041         int ret;
3042         struct rbd_image_header h;
3043
3044         ret = rbd_read_header(rbd_dev, &h);
3045         if (ret < 0)
3046                 return ret;
3047
3048         down_write(&rbd_dev->header_rwsem);
3049
3050         /* Update image size, and check for resize of mapped image */
3051         rbd_dev->header.image_size = h.image_size;
3052         rbd_update_mapping_size(rbd_dev);
3053
3054         /* rbd_dev->header.object_prefix shouldn't change */
3055         kfree(rbd_dev->header.snap_sizes);
3056         kfree(rbd_dev->header.snap_names);
3057         /* osd requests may still refer to snapc */
3058         ceph_put_snap_context(rbd_dev->header.snapc);
3059
3060         if (hver)
3061                 *hver = h.obj_version;
3062         rbd_dev->header.obj_version = h.obj_version;
3063         rbd_dev->header.image_size = h.image_size;
3064         rbd_dev->header.snapc = h.snapc;
3065         rbd_dev->header.snap_names = h.snap_names;
3066         rbd_dev->header.snap_sizes = h.snap_sizes;
3067         /* Free the extra copy of the object prefix */
3068         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
3069         kfree(h.object_prefix);
3070
3071         ret = rbd_dev_snaps_update(rbd_dev);
3072         if (!ret)
3073                 ret = rbd_dev_snaps_register(rbd_dev);
3074
3075         up_write(&rbd_dev->header_rwsem);
3076
3077         return ret;
3078 }
3079
3080 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3081 {
3082         int ret;
3083
3084         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3085         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3086         if (rbd_dev->image_format == 1)
3087                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3088         else
3089                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3090         mutex_unlock(&ctl_mutex);
3091         revalidate_disk(rbd_dev->disk);
3092
3093         return ret;
3094 }
3095
3096 static int rbd_init_disk(struct rbd_device *rbd_dev)
3097 {
3098         struct gendisk *disk;
3099         struct request_queue *q;
3100         u64 segment_size;
3101
3102         /* create gendisk info */
3103         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3104         if (!disk)
3105                 return -ENOMEM;
3106
3107         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3108                  rbd_dev->dev_id);
3109         disk->major = rbd_dev->major;
3110         disk->first_minor = 0;
3111         disk->fops = &rbd_bd_ops;
3112         disk->private_data = rbd_dev;
3113
3114         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3115         if (!q)
3116                 goto out_disk;
3117
3118         /* We use the default size, but let's be explicit about it. */
3119         blk_queue_physical_block_size(q, SECTOR_SIZE);
3120
3121         /* set io sizes to object size */
3122         segment_size = rbd_obj_bytes(&rbd_dev->header);
3123         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3124         blk_queue_max_segment_size(q, segment_size);
3125         blk_queue_io_min(q, segment_size);
3126         blk_queue_io_opt(q, segment_size);
3127
3128         blk_queue_merge_bvec(q, rbd_merge_bvec);
3129         disk->queue = q;
3130
3131         q->queuedata = rbd_dev;
3132
3133         rbd_dev->disk = disk;
3134
3135         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
3136
3137         return 0;
3138 out_disk:
3139         put_disk(disk);
3140
3141         return -ENOMEM;
3142 }
3143
3144 /*
3145   sysfs
3146 */
3147
3148 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3149 {
3150         return container_of(dev, struct rbd_device, dev);
3151 }
3152
3153 static ssize_t rbd_size_show(struct device *dev,
3154                              struct device_attribute *attr, char *buf)
3155 {
3156         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3157         sector_t size;
3158
3159         down_read(&rbd_dev->header_rwsem);
3160         size = get_capacity(rbd_dev->disk);
3161         up_read(&rbd_dev->header_rwsem);
3162
3163         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
3164 }
3165
3166 /*
3167  * Note this shows the features for whatever's mapped, which is not
3168  * necessarily the base image.
3169  */
3170 static ssize_t rbd_features_show(struct device *dev,
3171                              struct device_attribute *attr, char *buf)
3172 {
3173         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3174
3175         return sprintf(buf, "0x%016llx\n",
3176                         (unsigned long long) rbd_dev->mapping.features);
3177 }
3178
3179 static ssize_t rbd_major_show(struct device *dev,
3180                               struct device_attribute *attr, char *buf)
3181 {
3182         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3183
3184         return sprintf(buf, "%d\n", rbd_dev->major);
3185 }
3186
3187 static ssize_t rbd_client_id_show(struct device *dev,
3188                                   struct device_attribute *attr, char *buf)
3189 {
3190         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3191
3192         return sprintf(buf, "client%lld\n",
3193                         ceph_client_id(rbd_dev->rbd_client->client));
3194 }
3195
3196 static ssize_t rbd_pool_show(struct device *dev,
3197                              struct device_attribute *attr, char *buf)
3198 {
3199         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3200
3201         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3202 }
3203
3204 static ssize_t rbd_pool_id_show(struct device *dev,
3205                              struct device_attribute *attr, char *buf)
3206 {
3207         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3208
3209         return sprintf(buf, "%llu\n",
3210                 (unsigned long long) rbd_dev->spec->pool_id);
3211 }
3212
3213 static ssize_t rbd_name_show(struct device *dev,
3214                              struct device_attribute *attr, char *buf)
3215 {
3216         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3217
3218         if (rbd_dev->spec->image_name)
3219                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3220
3221         return sprintf(buf, "(unknown)\n");
3222 }
3223
3224 static ssize_t rbd_image_id_show(struct device *dev,
3225                              struct device_attribute *attr, char *buf)
3226 {
3227         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3228
3229         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3230 }
3231
3232 /*
3233  * Shows the name of the currently-mapped snapshot (or
3234  * RBD_SNAP_HEAD_NAME for the base image).
3235  */
3236 static ssize_t rbd_snap_show(struct device *dev,
3237                              struct device_attribute *attr,
3238                              char *buf)
3239 {
3240         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3241
3242         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3243 }
3244
3245 /*
3246  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3247  * for the parent image.  If there is no parent, simply shows
3248  * "(no parent image)".
3249  */
3250 static ssize_t rbd_parent_show(struct device *dev,
3251                              struct device_attribute *attr,
3252                              char *buf)
3253 {
3254         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3255         struct rbd_spec *spec = rbd_dev->parent_spec;
3256         int count;
3257         char *bufp = buf;
3258
3259         if (!spec)
3260                 return sprintf(buf, "(no parent image)\n");
3261
3262         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3263                         (unsigned long long) spec->pool_id, spec->pool_name);
3264         if (count < 0)
3265                 return count;
3266         bufp += count;
3267
3268         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3269                         spec->image_name ? spec->image_name : "(unknown)");
3270         if (count < 0)
3271                 return count;
3272         bufp += count;
3273
3274         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3275                         (unsigned long long) spec->snap_id, spec->snap_name);
3276         if (count < 0)
3277                 return count;
3278         bufp += count;
3279
3280         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3281         if (count < 0)
3282                 return count;
3283         bufp += count;
3284
3285         return (ssize_t) (bufp - buf);
3286 }
3287
3288 static ssize_t rbd_image_refresh(struct device *dev,
3289                                  struct device_attribute *attr,
3290                                  const char *buf,
3291                                  size_t size)
3292 {
3293         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3294         int ret;
3295
3296         ret = rbd_dev_refresh(rbd_dev, NULL);
3297
3298         return ret < 0 ? ret : size;
3299 }
3300
3301 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3302 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3303 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3304 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3305 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3306 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3307 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3308 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3309 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3310 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3311 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3312
3313 static struct attribute *rbd_attrs[] = {
3314         &dev_attr_size.attr,
3315         &dev_attr_features.attr,
3316         &dev_attr_major.attr,
3317         &dev_attr_client_id.attr,
3318         &dev_attr_pool.attr,
3319         &dev_attr_pool_id.attr,
3320         &dev_attr_name.attr,
3321         &dev_attr_image_id.attr,
3322         &dev_attr_current_snap.attr,
3323         &dev_attr_parent.attr,
3324         &dev_attr_refresh.attr,
3325         NULL
3326 };
3327
3328 static struct attribute_group rbd_attr_group = {
3329         .attrs = rbd_attrs,
3330 };
3331
3332 static const struct attribute_group *rbd_attr_groups[] = {
3333         &rbd_attr_group,
3334         NULL
3335 };
3336
3337 static void rbd_sysfs_dev_release(struct device *dev)
3338 {
3339 }
3340
3341 static struct device_type rbd_device_type = {
3342         .name           = "rbd",
3343         .groups         = rbd_attr_groups,
3344         .release        = rbd_sysfs_dev_release,
3345 };
3346
3347
3348 /*
3349   sysfs - snapshots
3350 */
3351
3352 static ssize_t rbd_snap_size_show(struct device *dev,
3353                                   struct device_attribute *attr,
3354                                   char *buf)
3355 {
3356         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3357
3358         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
3359 }
3360
3361 static ssize_t rbd_snap_id_show(struct device *dev,
3362                                 struct device_attribute *attr,
3363                                 char *buf)
3364 {
3365         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3366
3367         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
3368 }
3369
3370 static ssize_t rbd_snap_features_show(struct device *dev,
3371                                 struct device_attribute *attr,
3372                                 char *buf)
3373 {
3374         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3375
3376         return sprintf(buf, "0x%016llx\n",
3377                         (unsigned long long) snap->features);
3378 }
3379
3380 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
3381 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
3382 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
3383
3384 static struct attribute *rbd_snap_attrs[] = {
3385         &dev_attr_snap_size.attr,
3386         &dev_attr_snap_id.attr,
3387         &dev_attr_snap_features.attr,
3388         NULL,
3389 };
3390
3391 static struct attribute_group rbd_snap_attr_group = {
3392         .attrs = rbd_snap_attrs,
3393 };
3394
3395 static void rbd_snap_dev_release(struct device *dev)
3396 {
3397         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3398         kfree(snap->name);
3399         kfree(snap);
3400 }
3401
3402 static const struct attribute_group *rbd_snap_attr_groups[] = {
3403         &rbd_snap_attr_group,
3404         NULL
3405 };
3406
3407 static struct device_type rbd_snap_device_type = {
3408         .groups         = rbd_snap_attr_groups,
3409         .release        = rbd_snap_dev_release,
3410 };
3411
3412 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3413 {
3414         kref_get(&spec->kref);
3415
3416         return spec;
3417 }
3418
3419 static void rbd_spec_free(struct kref *kref);
3420 static void rbd_spec_put(struct rbd_spec *spec)
3421 {
3422         if (spec)
3423                 kref_put(&spec->kref, rbd_spec_free);
3424 }
3425
3426 static struct rbd_spec *rbd_spec_alloc(void)
3427 {
3428         struct rbd_spec *spec;
3429
3430         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3431         if (!spec)
3432                 return NULL;
3433         kref_init(&spec->kref);
3434
3435         return spec;
3436 }
3437
3438 static void rbd_spec_free(struct kref *kref)
3439 {
3440         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3441
3442         kfree(spec->pool_name);
3443         kfree(spec->image_id);
3444         kfree(spec->image_name);
3445         kfree(spec->snap_name);
3446         kfree(spec);
3447 }
3448
3449 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3450                                 struct rbd_spec *spec)
3451 {
3452         struct rbd_device *rbd_dev;
3453
3454         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3455         if (!rbd_dev)
3456                 return NULL;
3457
3458         spin_lock_init(&rbd_dev->lock);
3459         rbd_dev->flags = 0;
3460         INIT_LIST_HEAD(&rbd_dev->node);
3461         INIT_LIST_HEAD(&rbd_dev->snaps);
3462         init_rwsem(&rbd_dev->header_rwsem);
3463
3464         rbd_dev->spec = spec;
3465         rbd_dev->rbd_client = rbdc;
3466
3467         /* Initialize the layout used for all rbd requests */
3468
3469         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3470         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3471         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3472         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3473
3474         return rbd_dev;
3475 }
3476
3477 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3478 {
3479         rbd_spec_put(rbd_dev->parent_spec);
3480         kfree(rbd_dev->header_name);
3481         rbd_put_client(rbd_dev->rbd_client);
3482         rbd_spec_put(rbd_dev->spec);
3483         kfree(rbd_dev);
3484 }
3485
3486 static bool rbd_snap_registered(struct rbd_snap *snap)
3487 {
3488         bool ret = snap->dev.type == &rbd_snap_device_type;
3489         bool reg = device_is_registered(&snap->dev);
3490
3491         rbd_assert(!ret ^ reg);
3492
3493         return ret;
3494 }
3495
3496 static void rbd_remove_snap_dev(struct rbd_snap *snap)
3497 {
3498         list_del(&snap->node);
3499         if (device_is_registered(&snap->dev))
3500                 device_unregister(&snap->dev);
3501 }
3502
3503 static int rbd_register_snap_dev(struct rbd_snap *snap,
3504                                   struct device *parent)
3505 {
3506         struct device *dev = &snap->dev;
3507         int ret;
3508
3509         dev->type = &rbd_snap_device_type;
3510         dev->parent = parent;
3511         dev->release = rbd_snap_dev_release;
3512         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
3513         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
3514
3515         ret = device_register(dev);
3516
3517         return ret;
3518 }
3519
3520 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
3521                                                 const char *snap_name,
3522                                                 u64 snap_id, u64 snap_size,
3523                                                 u64 snap_features)
3524 {
3525         struct rbd_snap *snap;
3526         int ret;
3527
3528         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3529         if (!snap)
3530                 return ERR_PTR(-ENOMEM);
3531
3532         ret = -ENOMEM;
3533         snap->name = kstrdup(snap_name, GFP_KERNEL);
3534         if (!snap->name)
3535                 goto err;
3536
3537         snap->id = snap_id;
3538         snap->size = snap_size;
3539         snap->features = snap_features;
3540
3541         return snap;
3542
3543 err:
3544         kfree(snap->name);
3545         kfree(snap);
3546
3547         return ERR_PTR(ret);
3548 }
3549
3550 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3551                 u64 *snap_size, u64 *snap_features)
3552 {
3553         char *snap_name;
3554
3555         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3556
3557         *snap_size = rbd_dev->header.snap_sizes[which];
3558         *snap_features = 0;     /* No features for v1 */
3559
3560         /* Skip over names until we find the one we are looking for */
3561
3562         snap_name = rbd_dev->header.snap_names;
3563         while (which--)
3564                 snap_name += strlen(snap_name) + 1;
3565
3566         return snap_name;
3567 }
3568
3569 /*
3570  * Get the size and object order for an image snapshot, or if
3571  * snap_id is CEPH_NOSNAP, gets this information for the base
3572  * image.
3573  */
3574 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3575                                 u8 *order, u64 *snap_size)
3576 {
3577         __le64 snapid = cpu_to_le64(snap_id);
3578         int ret;
3579         struct {
3580                 u8 order;
3581                 __le64 size;
3582         } __attribute__ ((packed)) size_buf = { 0 };
3583
3584         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3585                                 "rbd", "get_size",
3586                                 &snapid, sizeof (snapid),
3587                                 &size_buf, sizeof (size_buf), NULL);
3588         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3589         if (ret < 0)
3590                 return ret;
3591         if (ret < sizeof (size_buf))
3592                 return -ERANGE;
3593
3594         *order = size_buf.order;
3595         *snap_size = le64_to_cpu(size_buf.size);
3596
3597         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3598                 (unsigned long long)snap_id, (unsigned int)*order,
3599                 (unsigned long long)*snap_size);
3600
3601         return 0;
3602 }
3603
3604 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3605 {
3606         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3607                                         &rbd_dev->header.obj_order,
3608                                         &rbd_dev->header.image_size);
3609 }
3610
3611 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3612 {
3613         void *reply_buf;
3614         int ret;
3615         void *p;
3616
3617         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3618         if (!reply_buf)
3619                 return -ENOMEM;
3620
3621         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3622                                 "rbd", "get_object_prefix", NULL, 0,
3623                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3624         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3625         if (ret < 0)
3626                 goto out;
3627
3628         p = reply_buf;
3629         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3630                                                 p + ret, NULL, GFP_NOIO);
3631         ret = 0;
3632
3633         if (IS_ERR(rbd_dev->header.object_prefix)) {
3634                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3635                 rbd_dev->header.object_prefix = NULL;
3636         } else {
3637                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3638         }
3639 out:
3640         kfree(reply_buf);
3641
3642         return ret;
3643 }
3644
3645 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3646                 u64 *snap_features)
3647 {
3648         __le64 snapid = cpu_to_le64(snap_id);
3649         struct {
3650                 __le64 features;
3651                 __le64 incompat;
3652         } __attribute__ ((packed)) features_buf = { 0 };
3653         u64 incompat;
3654         int ret;
3655
3656         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3657                                 "rbd", "get_features",
3658                                 &snapid, sizeof (snapid),
3659                                 &features_buf, sizeof (features_buf), NULL);
3660         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3661         if (ret < 0)
3662                 return ret;
3663         if (ret < sizeof (features_buf))
3664                 return -ERANGE;
3665
3666         incompat = le64_to_cpu(features_buf.incompat);
3667         if (incompat & ~RBD_FEATURES_SUPPORTED)
3668                 return -ENXIO;
3669
3670         *snap_features = le64_to_cpu(features_buf.features);
3671
3672         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3673                 (unsigned long long)snap_id,
3674                 (unsigned long long)*snap_features,
3675                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3676
3677         return 0;
3678 }
3679
3680 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3681 {
3682         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3683                                                 &rbd_dev->header.features);
3684 }
3685
3686 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3687 {
3688         struct rbd_spec *parent_spec;
3689         size_t size;
3690         void *reply_buf = NULL;
3691         __le64 snapid;
3692         void *p;
3693         void *end;
3694         char *image_id;
3695         u64 overlap;
3696         int ret;
3697
3698         parent_spec = rbd_spec_alloc();
3699         if (!parent_spec)
3700                 return -ENOMEM;
3701
3702         size = sizeof (__le64) +                                /* pool_id */
3703                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3704                 sizeof (__le64) +                               /* snap_id */
3705                 sizeof (__le64);                                /* overlap */
3706         reply_buf = kmalloc(size, GFP_KERNEL);
3707         if (!reply_buf) {
3708                 ret = -ENOMEM;
3709                 goto out_err;
3710         }
3711
3712         snapid = cpu_to_le64(CEPH_NOSNAP);
3713         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3714                                 "rbd", "get_parent",
3715                                 &snapid, sizeof (snapid),
3716                                 reply_buf, size, NULL);
3717         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3718         if (ret < 0)
3719                 goto out_err;
3720
3721         p = reply_buf;
3722         end = reply_buf + ret;
3723         ret = -ERANGE;
3724         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3725         if (parent_spec->pool_id == CEPH_NOPOOL)
3726                 goto out;       /* No parent?  No problem. */
3727
3728         /* The ceph file layout needs to fit pool id in 32 bits */
3729
3730         ret = -EIO;
3731         if (WARN_ON(parent_spec->pool_id > (u64)U32_MAX))
3732                 goto out_err;
3733
3734         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3735         if (IS_ERR(image_id)) {
3736                 ret = PTR_ERR(image_id);
3737                 goto out_err;
3738         }
3739         parent_spec->image_id = image_id;
3740         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3741         ceph_decode_64_safe(&p, end, overlap, out_err);
3742
3743         rbd_dev->parent_overlap = overlap;
3744         rbd_dev->parent_spec = parent_spec;
3745         parent_spec = NULL;     /* rbd_dev now owns this */
3746 out:
3747         ret = 0;
3748 out_err:
3749         kfree(reply_buf);
3750         rbd_spec_put(parent_spec);
3751
3752         return ret;
3753 }
3754
3755 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3756 {
3757         struct {
3758                 __le64 stripe_unit;
3759                 __le64 stripe_count;
3760         } __attribute__ ((packed)) striping_info_buf = { 0 };
3761         size_t size = sizeof (striping_info_buf);
3762         void *p;
3763         u64 obj_size;
3764         u64 stripe_unit;
3765         u64 stripe_count;
3766         int ret;
3767
3768         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3769                                 "rbd", "get_stripe_unit_count", NULL, 0,
3770                                 (char *)&striping_info_buf, size, NULL);
3771         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3772         if (ret < 0)
3773                 return ret;
3774         if (ret < size)
3775                 return -ERANGE;
3776
3777         /*
3778          * We don't actually support the "fancy striping" feature
3779          * (STRIPINGV2) yet, but if the striping sizes are the
3780          * defaults the behavior is the same as before.  So find
3781          * out, and only fail if the image has non-default values.
3782          */
3783         ret = -EINVAL;
3784         obj_size = (u64)1 << rbd_dev->header.obj_order;
3785         p = &striping_info_buf;
3786         stripe_unit = ceph_decode_64(&p);
3787         if (stripe_unit != obj_size) {
3788                 rbd_warn(rbd_dev, "unsupported stripe unit "
3789                                 "(got %llu want %llu)",
3790                                 stripe_unit, obj_size);
3791                 return -EINVAL;
3792         }
3793         stripe_count = ceph_decode_64(&p);
3794         if (stripe_count != 1) {
3795                 rbd_warn(rbd_dev, "unsupported stripe count "
3796                                 "(got %llu want 1)", stripe_count);
3797                 return -EINVAL;
3798         }
3799         rbd_dev->stripe_unit = stripe_unit;
3800         rbd_dev->stripe_count = stripe_count;
3801
3802         return 0;
3803 }
3804
3805 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3806 {
3807         size_t image_id_size;
3808         char *image_id;
3809         void *p;
3810         void *end;
3811         size_t size;
3812         void *reply_buf = NULL;
3813         size_t len = 0;
3814         char *image_name = NULL;
3815         int ret;
3816
3817         rbd_assert(!rbd_dev->spec->image_name);
3818
3819         len = strlen(rbd_dev->spec->image_id);
3820         image_id_size = sizeof (__le32) + len;
3821         image_id = kmalloc(image_id_size, GFP_KERNEL);
3822         if (!image_id)
3823                 return NULL;
3824
3825         p = image_id;
3826         end = image_id + image_id_size;
3827         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3828
3829         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3830         reply_buf = kmalloc(size, GFP_KERNEL);
3831         if (!reply_buf)
3832                 goto out;
3833
3834         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3835                                 "rbd", "dir_get_name",
3836                                 image_id, image_id_size,
3837                                 reply_buf, size, NULL);
3838         if (ret < 0)
3839                 goto out;
3840         p = reply_buf;
3841         end = reply_buf + size;
3842         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3843         if (IS_ERR(image_name))
3844                 image_name = NULL;
3845         else
3846                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3847 out:
3848         kfree(reply_buf);
3849         kfree(image_id);
3850
3851         return image_name;
3852 }
3853
3854 /*
3855  * When a parent image gets probed, we only have the pool, image,
3856  * and snapshot ids but not the names of any of them.  This call
3857  * is made later to fill in those names.  It has to be done after
3858  * rbd_dev_snaps_update() has completed because some of the
3859  * information (in particular, snapshot name) is not available
3860  * until then.
3861  */
3862 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3863 {
3864         struct ceph_osd_client *osdc;
3865         const char *name;
3866         void *reply_buf = NULL;
3867         int ret;
3868
3869         if (rbd_dev->spec->pool_name)
3870                 return 0;       /* Already have the names */
3871
3872         /* Look up the pool name */
3873
3874         osdc = &rbd_dev->rbd_client->client->osdc;
3875         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3876         if (!name) {
3877                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3878                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3879                 return -EIO;
3880         }
3881
3882         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3883         if (!rbd_dev->spec->pool_name)
3884                 return -ENOMEM;
3885
3886         /* Fetch the image name; tolerate failure here */
3887
3888         name = rbd_dev_image_name(rbd_dev);
3889         if (name)
3890                 rbd_dev->spec->image_name = (char *)name;
3891         else
3892                 rbd_warn(rbd_dev, "unable to get image name");
3893
3894         /* Look up the snapshot name. */
3895
3896         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3897         if (!name) {
3898                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3899                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3900                 ret = -EIO;
3901                 goto out_err;
3902         }
3903         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3904         if(!rbd_dev->spec->snap_name)
3905                 goto out_err;
3906
3907         return 0;
3908 out_err:
3909         kfree(reply_buf);
3910         kfree(rbd_dev->spec->pool_name);
3911         rbd_dev->spec->pool_name = NULL;
3912
3913         return ret;
3914 }
3915
3916 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3917 {
3918         size_t size;
3919         int ret;
3920         void *reply_buf;
3921         void *p;
3922         void *end;
3923         u64 seq;
3924         u32 snap_count;
3925         struct ceph_snap_context *snapc;
3926         u32 i;
3927
3928         /*
3929          * We'll need room for the seq value (maximum snapshot id),
3930          * snapshot count, and array of that many snapshot ids.
3931          * For now we have a fixed upper limit on the number we're
3932          * prepared to receive.
3933          */
3934         size = sizeof (__le64) + sizeof (__le32) +
3935                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3936         reply_buf = kzalloc(size, GFP_KERNEL);
3937         if (!reply_buf)
3938                 return -ENOMEM;
3939
3940         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3941                                 "rbd", "get_snapcontext", NULL, 0,
3942                                 reply_buf, size, ver);
3943         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3944         if (ret < 0)
3945                 goto out;
3946
3947         p = reply_buf;
3948         end = reply_buf + ret;
3949         ret = -ERANGE;
3950         ceph_decode_64_safe(&p, end, seq, out);
3951         ceph_decode_32_safe(&p, end, snap_count, out);
3952
3953         /*
3954          * Make sure the reported number of snapshot ids wouldn't go
3955          * beyond the end of our buffer.  But before checking that,
3956          * make sure the computed size of the snapshot context we
3957          * allocate is representable in a size_t.
3958          */
3959         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3960                                  / sizeof (u64)) {
3961                 ret = -EINVAL;
3962                 goto out;
3963         }
3964         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3965                 goto out;
3966
3967         size = sizeof (struct ceph_snap_context) +
3968                                 snap_count * sizeof (snapc->snaps[0]);
3969         snapc = kmalloc(size, GFP_KERNEL);
3970         if (!snapc) {
3971                 ret = -ENOMEM;
3972                 goto out;
3973         }
3974         ret = 0;
3975
3976         atomic_set(&snapc->nref, 1);
3977         snapc->seq = seq;
3978         snapc->num_snaps = snap_count;
3979         for (i = 0; i < snap_count; i++)
3980                 snapc->snaps[i] = ceph_decode_64(&p);
3981
3982         rbd_dev->header.snapc = snapc;
3983
3984         dout("  snap context seq = %llu, snap_count = %u\n",
3985                 (unsigned long long)seq, (unsigned int)snap_count);
3986 out:
3987         kfree(reply_buf);
3988
3989         return ret;
3990 }
3991
3992 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3993 {
3994         size_t size;
3995         void *reply_buf;
3996         __le64 snap_id;
3997         int ret;
3998         void *p;
3999         void *end;
4000         char *snap_name;
4001
4002         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4003         reply_buf = kmalloc(size, GFP_KERNEL);
4004         if (!reply_buf)
4005                 return ERR_PTR(-ENOMEM);
4006
4007         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
4008         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4009                                 "rbd", "get_snapshot_name",
4010                                 &snap_id, sizeof (snap_id),
4011                                 reply_buf, size, NULL);
4012         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4013         if (ret < 0)
4014                 goto out;
4015
4016         p = reply_buf;
4017         end = reply_buf + size;
4018         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4019         if (IS_ERR(snap_name)) {
4020                 ret = PTR_ERR(snap_name);
4021                 goto out;
4022         } else {
4023                 dout("  snap_id 0x%016llx snap_name = %s\n",
4024                         (unsigned long long)le64_to_cpu(snap_id), snap_name);
4025         }
4026         kfree(reply_buf);
4027
4028         return snap_name;
4029 out:
4030         kfree(reply_buf);
4031
4032         return ERR_PTR(ret);
4033 }
4034
4035 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
4036                 u64 *snap_size, u64 *snap_features)
4037 {
4038         u64 snap_id;
4039         u8 order;
4040         int ret;
4041
4042         snap_id = rbd_dev->header.snapc->snaps[which];
4043         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
4044         if (ret)
4045                 return ERR_PTR(ret);
4046         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
4047         if (ret)
4048                 return ERR_PTR(ret);
4049
4050         return rbd_dev_v2_snap_name(rbd_dev, which);
4051 }
4052
4053 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
4054                 u64 *snap_size, u64 *snap_features)
4055 {
4056         if (rbd_dev->image_format == 1)
4057                 return rbd_dev_v1_snap_info(rbd_dev, which,
4058                                         snap_size, snap_features);
4059         if (rbd_dev->image_format == 2)
4060                 return rbd_dev_v2_snap_info(rbd_dev, which,
4061                                         snap_size, snap_features);
4062         return ERR_PTR(-EINVAL);
4063 }
4064
4065 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
4066 {
4067         int ret;
4068         __u8 obj_order;
4069
4070         down_write(&rbd_dev->header_rwsem);
4071
4072         /* Grab old order first, to see if it changes */
4073
4074         obj_order = rbd_dev->header.obj_order,
4075         ret = rbd_dev_v2_image_size(rbd_dev);
4076         if (ret)
4077                 goto out;
4078         if (rbd_dev->header.obj_order != obj_order) {
4079                 ret = -EIO;
4080                 goto out;
4081         }
4082         rbd_update_mapping_size(rbd_dev);
4083
4084         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4085         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4086         if (ret)
4087                 goto out;
4088         ret = rbd_dev_snaps_update(rbd_dev);
4089         dout("rbd_dev_snaps_update returned %d\n", ret);
4090         if (ret)
4091                 goto out;
4092         ret = rbd_dev_snaps_register(rbd_dev);
4093         dout("rbd_dev_snaps_register returned %d\n", ret);
4094 out:
4095         up_write(&rbd_dev->header_rwsem);
4096
4097         return ret;
4098 }
4099
4100 /*
4101  * Scan the rbd device's current snapshot list and compare it to the
4102  * newly-received snapshot context.  Remove any existing snapshots
4103  * not present in the new snapshot context.  Add a new snapshot for
4104  * any snaphots in the snapshot context not in the current list.
4105  * And verify there are no changes to snapshots we already know
4106  * about.
4107  *
4108  * Assumes the snapshots in the snapshot context are sorted by
4109  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4110  * are also maintained in that order.)
4111  */
4112 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4113 {
4114         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4115         const u32 snap_count = snapc->num_snaps;
4116         struct list_head *head = &rbd_dev->snaps;
4117         struct list_head *links = head->next;
4118         u32 index = 0;
4119
4120         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
4121         while (index < snap_count || links != head) {
4122                 u64 snap_id;
4123                 struct rbd_snap *snap;
4124                 char *snap_name;
4125                 u64 snap_size = 0;
4126                 u64 snap_features = 0;
4127
4128                 snap_id = index < snap_count ? snapc->snaps[index]
4129                                              : CEPH_NOSNAP;
4130                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4131                                      : NULL;
4132                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4133
4134                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4135                         struct list_head *next = links->next;
4136
4137                         /*
4138                          * A previously-existing snapshot is not in
4139                          * the new snap context.
4140                          *
4141                          * If the now missing snapshot is the one the
4142                          * image is mapped to, clear its exists flag
4143                          * so we can avoid sending any more requests
4144                          * to it.
4145                          */
4146                         if (rbd_dev->spec->snap_id == snap->id)
4147                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4148                         rbd_remove_snap_dev(snap);
4149                         dout("%ssnap id %llu has been removed\n",
4150                                 rbd_dev->spec->snap_id == snap->id ?
4151                                                         "mapped " : "",
4152                                 (unsigned long long) snap->id);
4153
4154                         /* Done with this list entry; advance */
4155
4156                         links = next;
4157                         continue;
4158                 }
4159
4160                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4161                                         &snap_size, &snap_features);
4162                 if (IS_ERR(snap_name))
4163                         return PTR_ERR(snap_name);
4164
4165                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
4166                         (unsigned long long) snap_id);
4167                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4168                         struct rbd_snap *new_snap;
4169
4170                         /* We haven't seen this snapshot before */
4171
4172                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
4173                                         snap_id, snap_size, snap_features);
4174                         if (IS_ERR(new_snap)) {
4175                                 int err = PTR_ERR(new_snap);
4176
4177                                 dout("  failed to add dev, error %d\n", err);
4178
4179                                 return err;
4180                         }
4181
4182                         /* New goes before existing, or at end of list */
4183
4184                         dout("  added dev%s\n", snap ? "" : " at end\n");
4185                         if (snap)
4186                                 list_add_tail(&new_snap->node, &snap->node);
4187                         else
4188                                 list_add_tail(&new_snap->node, head);
4189                 } else {
4190                         /* Already have this one */
4191
4192                         dout("  already present\n");
4193
4194                         rbd_assert(snap->size == snap_size);
4195                         rbd_assert(!strcmp(snap->name, snap_name));
4196                         rbd_assert(snap->features == snap_features);
4197
4198                         /* Done with this list entry; advance */
4199
4200                         links = links->next;
4201                 }
4202
4203                 /* Advance to the next entry in the snapshot context */
4204
4205                 index++;
4206         }
4207         dout("%s: done\n", __func__);
4208
4209         return 0;
4210 }
4211
4212 /*
4213  * Scan the list of snapshots and register the devices for any that
4214  * have not already been registered.
4215  */
4216 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
4217 {
4218         struct rbd_snap *snap;
4219         int ret = 0;
4220
4221         dout("%s:\n", __func__);
4222         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
4223                 return -EIO;
4224
4225         list_for_each_entry(snap, &rbd_dev->snaps, node) {
4226                 if (!rbd_snap_registered(snap)) {
4227                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
4228                         if (ret < 0)
4229                                 break;
4230                 }
4231         }
4232         dout("%s: returning %d\n", __func__, ret);
4233
4234         return ret;
4235 }
4236
4237 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4238 {
4239         struct device *dev;
4240         int ret;
4241
4242         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4243
4244         dev = &rbd_dev->dev;
4245         dev->bus = &rbd_bus_type;
4246         dev->type = &rbd_device_type;
4247         dev->parent = &rbd_root_dev;
4248         dev->release = rbd_dev_release;
4249         dev_set_name(dev, "%d", rbd_dev->dev_id);
4250         ret = device_register(dev);
4251
4252         mutex_unlock(&ctl_mutex);
4253
4254         return ret;
4255 }
4256
4257 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4258 {
4259         device_unregister(&rbd_dev->dev);
4260 }
4261
4262 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4263
4264 /*
4265  * Get a unique rbd identifier for the given new rbd_dev, and add
4266  * the rbd_dev to the global list.  The minimum rbd id is 1.
4267  */
4268 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4269 {
4270         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4271
4272         spin_lock(&rbd_dev_list_lock);
4273         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4274         spin_unlock(&rbd_dev_list_lock);
4275         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4276                 (unsigned long long) rbd_dev->dev_id);
4277 }
4278
4279 /*
4280  * Remove an rbd_dev from the global list, and record that its
4281  * identifier is no longer in use.
4282  */
4283 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4284 {
4285         struct list_head *tmp;
4286         int rbd_id = rbd_dev->dev_id;
4287         int max_id;
4288
4289         rbd_assert(rbd_id > 0);
4290
4291         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4292                 (unsigned long long) rbd_dev->dev_id);
4293         spin_lock(&rbd_dev_list_lock);
4294         list_del_init(&rbd_dev->node);
4295
4296         /*
4297          * If the id being "put" is not the current maximum, there
4298          * is nothing special we need to do.
4299          */
4300         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4301                 spin_unlock(&rbd_dev_list_lock);
4302                 return;
4303         }
4304
4305         /*
4306          * We need to update the current maximum id.  Search the
4307          * list to find out what it is.  We're more likely to find
4308          * the maximum at the end, so search the list backward.
4309          */
4310         max_id = 0;
4311         list_for_each_prev(tmp, &rbd_dev_list) {
4312                 struct rbd_device *rbd_dev;
4313
4314                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4315                 if (rbd_dev->dev_id > max_id)
4316                         max_id = rbd_dev->dev_id;
4317         }
4318         spin_unlock(&rbd_dev_list_lock);
4319
4320         /*
4321          * The max id could have been updated by rbd_dev_id_get(), in
4322          * which case it now accurately reflects the new maximum.
4323          * Be careful not to overwrite the maximum value in that
4324          * case.
4325          */
4326         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4327         dout("  max dev id has been reset\n");
4328 }
4329
4330 /*
4331  * Skips over white space at *buf, and updates *buf to point to the
4332  * first found non-space character (if any). Returns the length of
4333  * the token (string of non-white space characters) found.  Note
4334  * that *buf must be terminated with '\0'.
4335  */
4336 static inline size_t next_token(const char **buf)
4337 {
4338         /*
4339         * These are the characters that produce nonzero for
4340         * isspace() in the "C" and "POSIX" locales.
4341         */
4342         const char *spaces = " \f\n\r\t\v";
4343
4344         *buf += strspn(*buf, spaces);   /* Find start of token */
4345
4346         return strcspn(*buf, spaces);   /* Return token length */
4347 }
4348
4349 /*
4350  * Finds the next token in *buf, and if the provided token buffer is
4351  * big enough, copies the found token into it.  The result, if
4352  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4353  * must be terminated with '\0' on entry.
4354  *
4355  * Returns the length of the token found (not including the '\0').
4356  * Return value will be 0 if no token is found, and it will be >=
4357  * token_size if the token would not fit.
4358  *
4359  * The *buf pointer will be updated to point beyond the end of the
4360  * found token.  Note that this occurs even if the token buffer is
4361  * too small to hold it.
4362  */
4363 static inline size_t copy_token(const char **buf,
4364                                 char *token,
4365                                 size_t token_size)
4366 {
4367         size_t len;
4368
4369         len = next_token(buf);
4370         if (len < token_size) {
4371                 memcpy(token, *buf, len);
4372                 *(token + len) = '\0';
4373         }
4374         *buf += len;
4375
4376         return len;
4377 }
4378
4379 /*
4380  * Finds the next token in *buf, dynamically allocates a buffer big
4381  * enough to hold a copy of it, and copies the token into the new
4382  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4383  * that a duplicate buffer is created even for a zero-length token.
4384  *
4385  * Returns a pointer to the newly-allocated duplicate, or a null
4386  * pointer if memory for the duplicate was not available.  If
4387  * the lenp argument is a non-null pointer, the length of the token
4388  * (not including the '\0') is returned in *lenp.
4389  *
4390  * If successful, the *buf pointer will be updated to point beyond
4391  * the end of the found token.
4392  *
4393  * Note: uses GFP_KERNEL for allocation.
4394  */
4395 static inline char *dup_token(const char **buf, size_t *lenp)
4396 {
4397         char *dup;
4398         size_t len;
4399
4400         len = next_token(buf);
4401         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4402         if (!dup)
4403                 return NULL;
4404         *(dup + len) = '\0';
4405         *buf += len;
4406
4407         if (lenp)
4408                 *lenp = len;
4409
4410         return dup;
4411 }
4412
4413 /*
4414  * Parse the options provided for an "rbd add" (i.e., rbd image
4415  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4416  * and the data written is passed here via a NUL-terminated buffer.
4417  * Returns 0 if successful or an error code otherwise.
4418  *
4419  * The information extracted from these options is recorded in
4420  * the other parameters which return dynamically-allocated
4421  * structures:
4422  *  ceph_opts
4423  *      The address of a pointer that will refer to a ceph options
4424  *      structure.  Caller must release the returned pointer using
4425  *      ceph_destroy_options() when it is no longer needed.
4426  *  rbd_opts
4427  *      Address of an rbd options pointer.  Fully initialized by
4428  *      this function; caller must release with kfree().
4429  *  spec
4430  *      Address of an rbd image specification pointer.  Fully
4431  *      initialized by this function based on parsed options.
4432  *      Caller must release with rbd_spec_put().
4433  *
4434  * The options passed take this form:
4435  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4436  * where:
4437  *  <mon_addrs>
4438  *      A comma-separated list of one or more monitor addresses.
4439  *      A monitor address is an ip address, optionally followed
4440  *      by a port number (separated by a colon).
4441  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4442  *  <options>
4443  *      A comma-separated list of ceph and/or rbd options.
4444  *  <pool_name>
4445  *      The name of the rados pool containing the rbd image.
4446  *  <image_name>
4447  *      The name of the image in that pool to map.
4448  *  <snap_id>
4449  *      An optional snapshot id.  If provided, the mapping will
4450  *      present data from the image at the time that snapshot was
4451  *      created.  The image head is used if no snapshot id is
4452  *      provided.  Snapshot mappings are always read-only.
4453  */
4454 static int rbd_add_parse_args(const char *buf,
4455                                 struct ceph_options **ceph_opts,
4456                                 struct rbd_options **opts,
4457                                 struct rbd_spec **rbd_spec)
4458 {
4459         size_t len;
4460         char *options;
4461         const char *mon_addrs;
4462         size_t mon_addrs_size;
4463         struct rbd_spec *spec = NULL;
4464         struct rbd_options *rbd_opts = NULL;
4465         struct ceph_options *copts;
4466         int ret;
4467
4468         /* The first four tokens are required */
4469
4470         len = next_token(&buf);
4471         if (!len) {
4472                 rbd_warn(NULL, "no monitor address(es) provided");
4473                 return -EINVAL;
4474         }
4475         mon_addrs = buf;
4476         mon_addrs_size = len + 1;
4477         buf += len;
4478
4479         ret = -EINVAL;
4480         options = dup_token(&buf, NULL);
4481         if (!options)
4482                 return -ENOMEM;
4483         if (!*options) {
4484                 rbd_warn(NULL, "no options provided");
4485                 goto out_err;
4486         }
4487
4488         spec = rbd_spec_alloc();
4489         if (!spec)
4490                 goto out_mem;
4491
4492         spec->pool_name = dup_token(&buf, NULL);
4493         if (!spec->pool_name)
4494                 goto out_mem;
4495         if (!*spec->pool_name) {
4496                 rbd_warn(NULL, "no pool name provided");
4497                 goto out_err;
4498         }
4499
4500         spec->image_name = dup_token(&buf, NULL);
4501         if (!spec->image_name)
4502                 goto out_mem;
4503         if (!*spec->image_name) {
4504                 rbd_warn(NULL, "no image name provided");
4505                 goto out_err;
4506         }
4507
4508         /*
4509          * Snapshot name is optional; default is to use "-"
4510          * (indicating the head/no snapshot).
4511          */
4512         len = next_token(&buf);
4513         if (!len) {
4514                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4515                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4516         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4517                 ret = -ENAMETOOLONG;
4518                 goto out_err;
4519         }
4520         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4521         if (!spec->snap_name)
4522                 goto out_mem;
4523         *(spec->snap_name + len) = '\0';
4524
4525         /* Initialize all rbd options to the defaults */
4526
4527         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4528         if (!rbd_opts)
4529                 goto out_mem;
4530
4531         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4532
4533         copts = ceph_parse_options(options, mon_addrs,
4534                                         mon_addrs + mon_addrs_size - 1,
4535                                         parse_rbd_opts_token, rbd_opts);
4536         if (IS_ERR(copts)) {
4537                 ret = PTR_ERR(copts);
4538                 goto out_err;
4539         }
4540         kfree(options);
4541
4542         *ceph_opts = copts;
4543         *opts = rbd_opts;
4544         *rbd_spec = spec;
4545
4546         return 0;
4547 out_mem:
4548         ret = -ENOMEM;
4549 out_err:
4550         kfree(rbd_opts);
4551         rbd_spec_put(spec);
4552         kfree(options);
4553
4554         return ret;
4555 }
4556
4557 /*
4558  * An rbd format 2 image has a unique identifier, distinct from the
4559  * name given to it by the user.  Internally, that identifier is
4560  * what's used to specify the names of objects related to the image.
4561  *
4562  * A special "rbd id" object is used to map an rbd image name to its
4563  * id.  If that object doesn't exist, then there is no v2 rbd image
4564  * with the supplied name.
4565  *
4566  * This function will record the given rbd_dev's image_id field if
4567  * it can be determined, and in that case will return 0.  If any
4568  * errors occur a negative errno will be returned and the rbd_dev's
4569  * image_id field will be unchanged (and should be NULL).
4570  */
4571 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4572 {
4573         int ret;
4574         size_t size;
4575         char *object_name;
4576         void *response;
4577         void *p;
4578
4579         /* If we already have it we don't need to look it up */
4580
4581         if (rbd_dev->spec->image_id)
4582                 return 0;
4583
4584         /*
4585          * When probing a parent image, the image id is already
4586          * known (and the image name likely is not).  There's no
4587          * need to fetch the image id again in this case.
4588          */
4589         if (rbd_dev->spec->image_id)
4590                 return 0;
4591
4592         /*
4593          * First, see if the format 2 image id file exists, and if
4594          * so, get the image's persistent id from it.
4595          */
4596         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4597         object_name = kmalloc(size, GFP_NOIO);
4598         if (!object_name)
4599                 return -ENOMEM;
4600         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4601         dout("rbd id object name is %s\n", object_name);
4602
4603         /* Response will be an encoded string, which includes a length */
4604
4605         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4606         response = kzalloc(size, GFP_NOIO);
4607         if (!response) {
4608                 ret = -ENOMEM;
4609                 goto out;
4610         }
4611
4612         ret = rbd_obj_method_sync(rbd_dev, object_name,
4613                                 "rbd", "get_id", NULL, 0,
4614                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4615         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4616         if (ret < 0)
4617                 goto out;
4618
4619         p = response;
4620         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
4621                                                 p + ret,
4622                                                 NULL, GFP_NOIO);
4623         ret = 0;
4624
4625         if (IS_ERR(rbd_dev->spec->image_id)) {
4626                 ret = PTR_ERR(rbd_dev->spec->image_id);
4627                 rbd_dev->spec->image_id = NULL;
4628         } else {
4629                 dout("image_id is %s\n", rbd_dev->spec->image_id);
4630         }
4631 out:
4632         kfree(response);
4633         kfree(object_name);
4634
4635         return ret;
4636 }
4637
4638 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4639 {
4640         int ret;
4641         size_t size;
4642
4643         /* Version 1 images have no id; empty string is used */
4644
4645         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
4646         if (!rbd_dev->spec->image_id)
4647                 return -ENOMEM;
4648
4649         /* Record the header object name for this rbd image. */
4650
4651         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
4652         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4653         if (!rbd_dev->header_name) {
4654                 ret = -ENOMEM;
4655                 goto out_err;
4656         }
4657         sprintf(rbd_dev->header_name, "%s%s",
4658                 rbd_dev->spec->image_name, RBD_SUFFIX);
4659
4660         /* Populate rbd image metadata */
4661
4662         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4663         if (ret < 0)
4664                 goto out_err;
4665
4666         /* Version 1 images have no parent (no layering) */
4667
4668         rbd_dev->parent_spec = NULL;
4669         rbd_dev->parent_overlap = 0;
4670
4671         rbd_dev->image_format = 1;
4672
4673         dout("discovered version 1 image, header name is %s\n",
4674                 rbd_dev->header_name);
4675
4676         return 0;
4677
4678 out_err:
4679         kfree(rbd_dev->header_name);
4680         rbd_dev->header_name = NULL;
4681         kfree(rbd_dev->spec->image_id);
4682         rbd_dev->spec->image_id = NULL;
4683
4684         return ret;
4685 }
4686
4687 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4688 {
4689         size_t size;
4690         int ret;
4691         u64 ver = 0;
4692
4693         /*
4694          * Image id was filled in by the caller.  Record the header
4695          * object name for this rbd image.
4696          */
4697         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4698         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4699         if (!rbd_dev->header_name)
4700                 return -ENOMEM;
4701         sprintf(rbd_dev->header_name, "%s%s",
4702                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4703
4704         /* Get the size and object order for the image */
4705         ret = rbd_dev_v2_image_size(rbd_dev);
4706         if (ret)
4707                 goto out_err;
4708
4709         /* Get the object prefix (a.k.a. block_name) for the image */
4710
4711         ret = rbd_dev_v2_object_prefix(rbd_dev);
4712         if (ret)
4713                 goto out_err;
4714
4715         /* Get the and check features for the image */
4716
4717         ret = rbd_dev_v2_features(rbd_dev);
4718         if (ret)
4719                 goto out_err;
4720
4721         /* If the image supports layering, get the parent info */
4722
4723         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4724                 ret = rbd_dev_v2_parent_info(rbd_dev);
4725                 if (ret)
4726                         goto out_err;
4727         }
4728
4729         /* If the image supports fancy striping, get its parameters */
4730
4731         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4732                 ret = rbd_dev_v2_striping_info(rbd_dev);
4733                 if (ret < 0)
4734                         goto out_err;
4735         }
4736
4737         /* crypto and compression type aren't (yet) supported for v2 images */
4738
4739         rbd_dev->header.crypt_type = 0;
4740         rbd_dev->header.comp_type = 0;
4741
4742         /* Get the snapshot context, plus the header version */
4743
4744         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4745         if (ret)
4746                 goto out_err;
4747         rbd_dev->header.obj_version = ver;
4748
4749         rbd_dev->image_format = 2;
4750
4751         dout("discovered version 2 image, header name is %s\n",
4752                 rbd_dev->header_name);
4753
4754         return 0;
4755 out_err:
4756         rbd_dev->parent_overlap = 0;
4757         rbd_spec_put(rbd_dev->parent_spec);
4758         rbd_dev->parent_spec = NULL;
4759         kfree(rbd_dev->header_name);
4760         rbd_dev->header_name = NULL;
4761         kfree(rbd_dev->header.object_prefix);
4762         rbd_dev->header.object_prefix = NULL;
4763
4764         return ret;
4765 }
4766
4767 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4768 {
4769         struct rbd_device *parent = NULL;
4770         struct rbd_spec *parent_spec = NULL;
4771         struct rbd_client *rbdc = NULL;
4772         int ret;
4773
4774         /* no need to lock here, as rbd_dev is not registered yet */
4775         ret = rbd_dev_snaps_update(rbd_dev);
4776         if (ret)
4777                 return ret;
4778
4779         ret = rbd_dev_probe_update_spec(rbd_dev);
4780         if (ret)
4781                 goto err_out_snaps;
4782
4783         ret = rbd_dev_set_mapping(rbd_dev);
4784         if (ret)
4785                 goto err_out_snaps;
4786
4787         /* generate unique id: find highest unique id, add one */
4788         rbd_dev_id_get(rbd_dev);
4789
4790         /* Fill in the device name, now that we have its id. */
4791         BUILD_BUG_ON(DEV_NAME_LEN
4792                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4793         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4794
4795         /* Get our block major device number. */
4796
4797         ret = register_blkdev(0, rbd_dev->name);
4798         if (ret < 0)
4799                 goto err_out_id;
4800         rbd_dev->major = ret;
4801
4802         /* Set up the blkdev mapping. */
4803
4804         ret = rbd_init_disk(rbd_dev);
4805         if (ret)
4806                 goto err_out_blkdev;
4807
4808         ret = rbd_bus_add_dev(rbd_dev);
4809         if (ret)
4810                 goto err_out_disk;
4811
4812         /*
4813          * At this point cleanup in the event of an error is the job
4814          * of the sysfs code (initiated by rbd_bus_del_dev()).
4815          */
4816         /* Probe the parent if there is one */
4817
4818         if (rbd_dev->parent_spec) {
4819                 /*
4820                  * We need to pass a reference to the client and the
4821                  * parent spec when creating the parent rbd_dev.
4822                  * Images related by parent/child relationships
4823                  * always share both.
4824                  */
4825                 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4826                 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4827
4828                 parent = rbd_dev_create(rbdc, parent_spec);
4829                 if (!parent) {
4830                         ret = -ENOMEM;
4831                         goto err_out_spec;
4832                 }
4833                 rbdc = NULL;            /* parent now owns reference */
4834                 parent_spec = NULL;     /* parent now owns reference */
4835                 ret = rbd_dev_probe(parent);
4836                 if (ret < 0)
4837                         goto err_out_parent;
4838                 rbd_dev->parent = parent;
4839         }
4840
4841         down_write(&rbd_dev->header_rwsem);
4842         ret = rbd_dev_snaps_register(rbd_dev);
4843         up_write(&rbd_dev->header_rwsem);
4844         if (ret)
4845                 goto err_out_bus;
4846
4847         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4848         if (ret)
4849                 goto err_out_bus;
4850
4851         /* Everything's ready.  Announce the disk to the world. */
4852
4853         add_disk(rbd_dev->disk);
4854
4855         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4856                 (unsigned long long) rbd_dev->mapping.size);
4857
4858         return ret;
4859
4860 err_out_parent:
4861         rbd_dev_destroy(parent);
4862 err_out_spec:
4863         rbd_spec_put(parent_spec);
4864         rbd_put_client(rbdc);
4865 err_out_bus:
4866         /* this will also clean up rest of rbd_dev stuff */
4867
4868         rbd_bus_del_dev(rbd_dev);
4869
4870         return ret;
4871 err_out_disk:
4872         rbd_free_disk(rbd_dev);
4873 err_out_blkdev:
4874         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4875 err_out_id:
4876         rbd_dev_id_put(rbd_dev);
4877 err_out_snaps:
4878         rbd_remove_all_snaps(rbd_dev);
4879
4880         return ret;
4881 }
4882
4883 /*
4884  * Probe for the existence of the header object for the given rbd
4885  * device.  For format 2 images this includes determining the image
4886  * id.
4887  */
4888 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4889 {
4890         int ret;
4891
4892         /*
4893          * Get the id from the image id object.  If it's not a
4894          * format 2 image, we'll get ENOENT back, and we'll assume
4895          * it's a format 1 image.
4896          */
4897         ret = rbd_dev_image_id(rbd_dev);
4898         if (ret)
4899                 ret = rbd_dev_v1_probe(rbd_dev);
4900         else
4901                 ret = rbd_dev_v2_probe(rbd_dev);
4902         if (ret) {
4903                 dout("probe failed, returning %d\n", ret);
4904
4905                 return ret;
4906         }
4907
4908         ret = rbd_dev_probe_finish(rbd_dev);
4909         if (ret)
4910                 rbd_header_free(&rbd_dev->header);
4911
4912         return ret;
4913 }
4914
4915 static ssize_t rbd_add(struct bus_type *bus,
4916                        const char *buf,
4917                        size_t count)
4918 {
4919         struct rbd_device *rbd_dev = NULL;
4920         struct ceph_options *ceph_opts = NULL;
4921         struct rbd_options *rbd_opts = NULL;
4922         struct rbd_spec *spec = NULL;
4923         struct rbd_client *rbdc;
4924         struct ceph_osd_client *osdc;
4925         int rc = -ENOMEM;
4926
4927         if (!try_module_get(THIS_MODULE))
4928                 return -ENODEV;
4929
4930         /* parse add command */
4931         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4932         if (rc < 0)
4933                 goto err_out_module;
4934
4935         rbdc = rbd_get_client(ceph_opts);
4936         if (IS_ERR(rbdc)) {
4937                 rc = PTR_ERR(rbdc);
4938                 goto err_out_args;
4939         }
4940         ceph_opts = NULL;       /* rbd_dev client now owns this */
4941
4942         /* pick the pool */
4943         osdc = &rbdc->client->osdc;
4944         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4945         if (rc < 0)
4946                 goto err_out_client;
4947         spec->pool_id = (u64) rc;
4948
4949         /* The ceph file layout needs to fit pool id in 32 bits */
4950
4951         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4952                 rc = -EIO;
4953                 goto err_out_client;
4954         }
4955
4956         rbd_dev = rbd_dev_create(rbdc, spec);
4957         if (!rbd_dev)
4958                 goto err_out_client;
4959         rbdc = NULL;            /* rbd_dev now owns this */
4960         spec = NULL;            /* rbd_dev now owns this */
4961
4962         rbd_dev->mapping.read_only = rbd_opts->read_only;
4963         kfree(rbd_opts);
4964         rbd_opts = NULL;        /* done with this */
4965
4966         rc = rbd_dev_probe(rbd_dev);
4967         if (rc < 0)
4968                 goto err_out_rbd_dev;
4969
4970         return count;
4971 err_out_rbd_dev:
4972         rbd_dev_destroy(rbd_dev);
4973 err_out_client:
4974         rbd_put_client(rbdc);
4975 err_out_args:
4976         if (ceph_opts)
4977                 ceph_destroy_options(ceph_opts);
4978         kfree(rbd_opts);
4979         rbd_spec_put(spec);
4980 err_out_module:
4981         module_put(THIS_MODULE);
4982
4983         dout("Error adding device %s\n", buf);
4984
4985         return (ssize_t) rc;
4986 }
4987
4988 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4989 {
4990         struct list_head *tmp;
4991         struct rbd_device *rbd_dev;
4992
4993         spin_lock(&rbd_dev_list_lock);
4994         list_for_each(tmp, &rbd_dev_list) {
4995                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4996                 if (rbd_dev->dev_id == dev_id) {
4997                         spin_unlock(&rbd_dev_list_lock);
4998                         return rbd_dev;
4999                 }
5000         }
5001         spin_unlock(&rbd_dev_list_lock);
5002         return NULL;
5003 }
5004
5005 static void rbd_dev_release(struct device *dev)
5006 {
5007         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5008
5009         if (rbd_dev->watch_event)
5010                 rbd_dev_header_watch_sync(rbd_dev, 0);
5011
5012         /* clean up and free blkdev */
5013         rbd_free_disk(rbd_dev);
5014         unregister_blkdev(rbd_dev->major, rbd_dev->name);
5015
5016         /* release allocated disk header fields */
5017         rbd_header_free(&rbd_dev->header);
5018
5019         /* done with the id, and with the rbd_dev */
5020         rbd_dev_id_put(rbd_dev);
5021         rbd_assert(rbd_dev->rbd_client != NULL);
5022         rbd_dev_destroy(rbd_dev);
5023
5024         /* release module ref */
5025         module_put(THIS_MODULE);
5026 }
5027
5028 static void __rbd_remove(struct rbd_device *rbd_dev)
5029 {
5030         rbd_remove_all_snaps(rbd_dev);
5031         rbd_bus_del_dev(rbd_dev);
5032 }
5033
5034 static ssize_t rbd_remove(struct bus_type *bus,
5035                           const char *buf,
5036                           size_t count)
5037 {
5038         struct rbd_device *rbd_dev = NULL;
5039         int target_id, rc;
5040         unsigned long ul;
5041         int ret = count;
5042
5043         rc = strict_strtoul(buf, 10, &ul);
5044         if (rc)
5045                 return rc;
5046
5047         /* convert to int; abort if we lost anything in the conversion */
5048         target_id = (int) ul;
5049         if (target_id != ul)
5050                 return -EINVAL;
5051
5052         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5053
5054         rbd_dev = __rbd_get_dev(target_id);
5055         if (!rbd_dev) {
5056                 ret = -ENOENT;
5057                 goto done;
5058         }
5059
5060         spin_lock_irq(&rbd_dev->lock);
5061         if (rbd_dev->open_count)
5062                 ret = -EBUSY;
5063         else
5064                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5065         spin_unlock_irq(&rbd_dev->lock);
5066         if (ret < 0)
5067                 goto done;
5068
5069         while (rbd_dev->parent_spec) {
5070                 struct rbd_device *first = rbd_dev;
5071                 struct rbd_device *second = first->parent;
5072                 struct rbd_device *third;
5073
5074                 /*
5075                  * Follow to the parent with no grandparent and
5076                  * remove it.
5077                  */
5078                 while (second && (third = second->parent)) {
5079                         first = second;
5080                         second = third;
5081                 }
5082                 __rbd_remove(second);
5083                 rbd_spec_put(first->parent_spec);
5084                 first->parent_spec = NULL;
5085                 first->parent_overlap = 0;
5086                 first->parent = NULL;
5087         }
5088         __rbd_remove(rbd_dev);
5089
5090 done:
5091         mutex_unlock(&ctl_mutex);
5092
5093         return ret;
5094 }
5095
5096 /*
5097  * create control files in sysfs
5098  * /sys/bus/rbd/...
5099  */
5100 static int rbd_sysfs_init(void)
5101 {
5102         int ret;
5103
5104         ret = device_register(&rbd_root_dev);
5105         if (ret < 0)
5106                 return ret;
5107
5108         ret = bus_register(&rbd_bus_type);
5109         if (ret < 0)
5110                 device_unregister(&rbd_root_dev);
5111
5112         return ret;
5113 }
5114
5115 static void rbd_sysfs_cleanup(void)
5116 {
5117         bus_unregister(&rbd_bus_type);
5118         device_unregister(&rbd_root_dev);
5119 }
5120
5121 static int __init rbd_init(void)
5122 {
5123         int rc;
5124
5125         if (!libceph_compatible(NULL)) {
5126                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5127
5128                 return -EINVAL;
5129         }
5130         rc = rbd_sysfs_init();
5131         if (rc)
5132                 return rc;
5133         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5134         return 0;
5135 }
5136
5137 static void __exit rbd_exit(void)
5138 {
5139         rbd_sysfs_cleanup();
5140 }
5141
5142 module_init(rbd_init);
5143 module_exit(rbd_exit);
5144
5145 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5146 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5147 MODULE_DESCRIPTION("rados block device");
5148
5149 /* following authorship retained from original osdblk.c */
5150 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5151
5152 MODULE_LICENSE("GPL");