rbd: define zero_pages()
[firefly-linux-kernel-4.4.55.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (0)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 obj_version;
112 };
113
114 /*
115  * An rbd image specification.
116  *
117  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118  * identify an image.  Each rbd_dev structure includes a pointer to
119  * an rbd_spec structure that encapsulates this identity.
120  *
121  * Each of the id's in an rbd_spec has an associated name.  For a
122  * user-mapped image, the names are supplied and the id's associated
123  * with them are looked up.  For a layered image, a parent image is
124  * defined by the tuple, and the names are looked up.
125  *
126  * An rbd_dev structure contains a parent_spec pointer which is
127  * non-null if the image it represents is a child in a layered
128  * image.  This pointer will refer to the rbd_spec structure used
129  * by the parent rbd_dev for its own identity (i.e., the structure
130  * is shared between the parent and child).
131  *
132  * Since these structures are populated once, during the discovery
133  * phase of image construction, they are effectively immutable so
134  * we make no effort to synchronize access to them.
135  *
136  * Note that code herein does not assume the image name is known (it
137  * could be a null pointer).
138  */
139 struct rbd_spec {
140         u64             pool_id;
141         char            *pool_name;
142
143         char            *image_id;
144         char            *image_name;
145
146         u64             snap_id;
147         char            *snap_name;
148
149         struct kref     kref;
150 };
151
152 /*
153  * an instance of the client.  multiple devices may share an rbd client.
154  */
155 struct rbd_client {
156         struct ceph_client      *client;
157         struct kref             kref;
158         struct list_head        node;
159 };
160
161 struct rbd_img_request;
162 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
165
166 struct rbd_obj_request;
167 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
169 enum obj_request_type {
170         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171 };
172
173 enum obj_req_flags {
174         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
175         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
176         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
177         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
178 };
179
180 struct rbd_obj_request {
181         const char              *object_name;
182         u64                     offset;         /* object start byte */
183         u64                     length;         /* bytes from offset */
184         unsigned long           flags;
185
186         /*
187          * An object request associated with an image will have its
188          * img_data flag set; a standalone object request will not.
189          *
190          * A standalone object request will have which == BAD_WHICH
191          * and a null obj_request pointer.
192          *
193          * An object request initiated in support of a layered image
194          * object (to check for its existence before a write) will
195          * have which == BAD_WHICH and a non-null obj_request pointer.
196          *
197          * Finally, an object request for rbd image data will have
198          * which != BAD_WHICH, and will have a non-null img_request
199          * pointer.  The value of which will be in the range
200          * 0..(img_request->obj_request_count-1).
201          */
202         union {
203                 struct rbd_obj_request  *obj_request;   /* STAT op */
204                 struct {
205                         struct rbd_img_request  *img_request;
206                         u64                     img_offset;
207                         /* links for img_request->obj_requests list */
208                         struct list_head        links;
209                 };
210         };
211         u32                     which;          /* posn image request list */
212
213         enum obj_request_type   type;
214         union {
215                 struct bio      *bio_list;
216                 struct {
217                         struct page     **pages;
218                         u32             page_count;
219                 };
220         };
221
222         struct ceph_osd_request *osd_req;
223
224         u64                     xferred;        /* bytes transferred */
225         u64                     version;
226         int                     result;
227
228         rbd_obj_callback_t      callback;
229         struct completion       completion;
230
231         struct kref             kref;
232 };
233
234 enum img_req_flags {
235         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
236         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
237         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
238 };
239
240 struct rbd_img_request {
241         struct rbd_device       *rbd_dev;
242         u64                     offset; /* starting image byte offset */
243         u64                     length; /* byte count from offset */
244         unsigned long           flags;
245         union {
246                 u64                     snap_id;        /* for reads */
247                 struct ceph_snap_context *snapc;        /* for writes */
248         };
249         union {
250                 struct request          *rq;            /* block request */
251                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
252         };
253         spinlock_t              completion_lock;/* protects next_completion */
254         u32                     next_completion;
255         rbd_img_callback_t      callback;
256         u64                     xferred;/* aggregate bytes transferred */
257         int                     result; /* first nonzero obj_request result */
258
259         u32                     obj_request_count;
260         struct list_head        obj_requests;   /* rbd_obj_request structs */
261
262         struct kref             kref;
263 };
264
265 #define for_each_obj_request(ireq, oreq) \
266         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
267 #define for_each_obj_request_from(ireq, oreq) \
268         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
269 #define for_each_obj_request_safe(ireq, oreq, n) \
270         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
271
272 struct rbd_snap {
273         struct  device          dev;
274         const char              *name;
275         u64                     size;
276         struct list_head        node;
277         u64                     id;
278         u64                     features;
279 };
280
281 struct rbd_mapping {
282         u64                     size;
283         u64                     features;
284         bool                    read_only;
285 };
286
287 /*
288  * a single device
289  */
290 struct rbd_device {
291         int                     dev_id;         /* blkdev unique id */
292
293         int                     major;          /* blkdev assigned major */
294         struct gendisk          *disk;          /* blkdev's gendisk and rq */
295
296         u32                     image_format;   /* Either 1 or 2 */
297         struct rbd_client       *rbd_client;
298
299         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
300
301         spinlock_t              lock;           /* queue, flags, open_count */
302
303         struct rbd_image_header header;
304         unsigned long           flags;          /* possibly lock protected */
305         struct rbd_spec         *spec;
306
307         char                    *header_name;
308
309         struct ceph_file_layout layout;
310
311         struct ceph_osd_event   *watch_event;
312         struct rbd_obj_request  *watch_request;
313
314         struct rbd_spec         *parent_spec;
315         u64                     parent_overlap;
316         struct rbd_device       *parent;
317
318         /* protects updating the header */
319         struct rw_semaphore     header_rwsem;
320
321         struct rbd_mapping      mapping;
322
323         struct list_head        node;
324
325         /* list of snapshots */
326         struct list_head        snaps;
327
328         /* sysfs related */
329         struct device           dev;
330         unsigned long           open_count;     /* protected by lock */
331 };
332
333 /*
334  * Flag bits for rbd_dev->flags.  If atomicity is required,
335  * rbd_dev->lock is used to protect access.
336  *
337  * Currently, only the "removing" flag (which is coupled with the
338  * "open_count" field) requires atomic access.
339  */
340 enum rbd_dev_flags {
341         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
342         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
343 };
344
345 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
346
347 static LIST_HEAD(rbd_dev_list);    /* devices */
348 static DEFINE_SPINLOCK(rbd_dev_list_lock);
349
350 static LIST_HEAD(rbd_client_list);              /* clients */
351 static DEFINE_SPINLOCK(rbd_client_list_lock);
352
353 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
354 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
355
356 static void rbd_dev_release(struct device *dev);
357 static void rbd_remove_snap_dev(struct rbd_snap *snap);
358
359 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
360                        size_t count);
361 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
362                           size_t count);
363 static int rbd_dev_probe(struct rbd_device *rbd_dev);
364
365 static struct bus_attribute rbd_bus_attrs[] = {
366         __ATTR(add, S_IWUSR, NULL, rbd_add),
367         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
368         __ATTR_NULL
369 };
370
371 static struct bus_type rbd_bus_type = {
372         .name           = "rbd",
373         .bus_attrs      = rbd_bus_attrs,
374 };
375
376 static void rbd_root_dev_release(struct device *dev)
377 {
378 }
379
380 static struct device rbd_root_dev = {
381         .init_name =    "rbd",
382         .release =      rbd_root_dev_release,
383 };
384
385 static __printf(2, 3)
386 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
387 {
388         struct va_format vaf;
389         va_list args;
390
391         va_start(args, fmt);
392         vaf.fmt = fmt;
393         vaf.va = &args;
394
395         if (!rbd_dev)
396                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
397         else if (rbd_dev->disk)
398                 printk(KERN_WARNING "%s: %s: %pV\n",
399                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
400         else if (rbd_dev->spec && rbd_dev->spec->image_name)
401                 printk(KERN_WARNING "%s: image %s: %pV\n",
402                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
403         else if (rbd_dev->spec && rbd_dev->spec->image_id)
404                 printk(KERN_WARNING "%s: id %s: %pV\n",
405                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
406         else    /* punt */
407                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
408                         RBD_DRV_NAME, rbd_dev, &vaf);
409         va_end(args);
410 }
411
412 #ifdef RBD_DEBUG
413 #define rbd_assert(expr)                                                \
414                 if (unlikely(!(expr))) {                                \
415                         printk(KERN_ERR "\nAssertion failure in %s() "  \
416                                                 "at line %d:\n\n"       \
417                                         "\trbd_assert(%s);\n\n",        \
418                                         __func__, __LINE__, #expr);     \
419                         BUG();                                          \
420                 }
421 #else /* !RBD_DEBUG */
422 #  define rbd_assert(expr)      ((void) 0)
423 #endif /* !RBD_DEBUG */
424
425 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
426 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
427
428 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
429 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
430
431 static int rbd_open(struct block_device *bdev, fmode_t mode)
432 {
433         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
434         bool removing = false;
435
436         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
437                 return -EROFS;
438
439         spin_lock_irq(&rbd_dev->lock);
440         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
441                 removing = true;
442         else
443                 rbd_dev->open_count++;
444         spin_unlock_irq(&rbd_dev->lock);
445         if (removing)
446                 return -ENOENT;
447
448         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
449         (void) get_device(&rbd_dev->dev);
450         set_device_ro(bdev, rbd_dev->mapping.read_only);
451         mutex_unlock(&ctl_mutex);
452
453         return 0;
454 }
455
456 static int rbd_release(struct gendisk *disk, fmode_t mode)
457 {
458         struct rbd_device *rbd_dev = disk->private_data;
459         unsigned long open_count_before;
460
461         spin_lock_irq(&rbd_dev->lock);
462         open_count_before = rbd_dev->open_count--;
463         spin_unlock_irq(&rbd_dev->lock);
464         rbd_assert(open_count_before > 0);
465
466         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
467         put_device(&rbd_dev->dev);
468         mutex_unlock(&ctl_mutex);
469
470         return 0;
471 }
472
473 static const struct block_device_operations rbd_bd_ops = {
474         .owner                  = THIS_MODULE,
475         .open                   = rbd_open,
476         .release                = rbd_release,
477 };
478
479 /*
480  * Initialize an rbd client instance.
481  * We own *ceph_opts.
482  */
483 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
484 {
485         struct rbd_client *rbdc;
486         int ret = -ENOMEM;
487
488         dout("%s:\n", __func__);
489         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
490         if (!rbdc)
491                 goto out_opt;
492
493         kref_init(&rbdc->kref);
494         INIT_LIST_HEAD(&rbdc->node);
495
496         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
497
498         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
499         if (IS_ERR(rbdc->client))
500                 goto out_mutex;
501         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
502
503         ret = ceph_open_session(rbdc->client);
504         if (ret < 0)
505                 goto out_err;
506
507         spin_lock(&rbd_client_list_lock);
508         list_add_tail(&rbdc->node, &rbd_client_list);
509         spin_unlock(&rbd_client_list_lock);
510
511         mutex_unlock(&ctl_mutex);
512         dout("%s: rbdc %p\n", __func__, rbdc);
513
514         return rbdc;
515
516 out_err:
517         ceph_destroy_client(rbdc->client);
518 out_mutex:
519         mutex_unlock(&ctl_mutex);
520         kfree(rbdc);
521 out_opt:
522         if (ceph_opts)
523                 ceph_destroy_options(ceph_opts);
524         dout("%s: error %d\n", __func__, ret);
525
526         return ERR_PTR(ret);
527 }
528
529 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
530 {
531         kref_get(&rbdc->kref);
532
533         return rbdc;
534 }
535
536 /*
537  * Find a ceph client with specific addr and configuration.  If
538  * found, bump its reference count.
539  */
540 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
541 {
542         struct rbd_client *client_node;
543         bool found = false;
544
545         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
546                 return NULL;
547
548         spin_lock(&rbd_client_list_lock);
549         list_for_each_entry(client_node, &rbd_client_list, node) {
550                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
551                         __rbd_get_client(client_node);
552
553                         found = true;
554                         break;
555                 }
556         }
557         spin_unlock(&rbd_client_list_lock);
558
559         return found ? client_node : NULL;
560 }
561
562 /*
563  * mount options
564  */
565 enum {
566         Opt_last_int,
567         /* int args above */
568         Opt_last_string,
569         /* string args above */
570         Opt_read_only,
571         Opt_read_write,
572         /* Boolean args above */
573         Opt_last_bool,
574 };
575
576 static match_table_t rbd_opts_tokens = {
577         /* int args above */
578         /* string args above */
579         {Opt_read_only, "read_only"},
580         {Opt_read_only, "ro"},          /* Alternate spelling */
581         {Opt_read_write, "read_write"},
582         {Opt_read_write, "rw"},         /* Alternate spelling */
583         /* Boolean args above */
584         {-1, NULL}
585 };
586
587 struct rbd_options {
588         bool    read_only;
589 };
590
591 #define RBD_READ_ONLY_DEFAULT   false
592
593 static int parse_rbd_opts_token(char *c, void *private)
594 {
595         struct rbd_options *rbd_opts = private;
596         substring_t argstr[MAX_OPT_ARGS];
597         int token, intval, ret;
598
599         token = match_token(c, rbd_opts_tokens, argstr);
600         if (token < 0)
601                 return -EINVAL;
602
603         if (token < Opt_last_int) {
604                 ret = match_int(&argstr[0], &intval);
605                 if (ret < 0) {
606                         pr_err("bad mount option arg (not int) "
607                                "at '%s'\n", c);
608                         return ret;
609                 }
610                 dout("got int token %d val %d\n", token, intval);
611         } else if (token > Opt_last_int && token < Opt_last_string) {
612                 dout("got string token %d val %s\n", token,
613                      argstr[0].from);
614         } else if (token > Opt_last_string && token < Opt_last_bool) {
615                 dout("got Boolean token %d\n", token);
616         } else {
617                 dout("got token %d\n", token);
618         }
619
620         switch (token) {
621         case Opt_read_only:
622                 rbd_opts->read_only = true;
623                 break;
624         case Opt_read_write:
625                 rbd_opts->read_only = false;
626                 break;
627         default:
628                 rbd_assert(false);
629                 break;
630         }
631         return 0;
632 }
633
634 /*
635  * Get a ceph client with specific addr and configuration, if one does
636  * not exist create it.
637  */
638 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
639 {
640         struct rbd_client *rbdc;
641
642         rbdc = rbd_client_find(ceph_opts);
643         if (rbdc)       /* using an existing client */
644                 ceph_destroy_options(ceph_opts);
645         else
646                 rbdc = rbd_client_create(ceph_opts);
647
648         return rbdc;
649 }
650
651 /*
652  * Destroy ceph client
653  *
654  * Caller must hold rbd_client_list_lock.
655  */
656 static void rbd_client_release(struct kref *kref)
657 {
658         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
659
660         dout("%s: rbdc %p\n", __func__, rbdc);
661         spin_lock(&rbd_client_list_lock);
662         list_del(&rbdc->node);
663         spin_unlock(&rbd_client_list_lock);
664
665         ceph_destroy_client(rbdc->client);
666         kfree(rbdc);
667 }
668
669 /*
670  * Drop reference to ceph client node. If it's not referenced anymore, release
671  * it.
672  */
673 static void rbd_put_client(struct rbd_client *rbdc)
674 {
675         if (rbdc)
676                 kref_put(&rbdc->kref, rbd_client_release);
677 }
678
679 static bool rbd_image_format_valid(u32 image_format)
680 {
681         return image_format == 1 || image_format == 2;
682 }
683
684 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
685 {
686         size_t size;
687         u32 snap_count;
688
689         /* The header has to start with the magic rbd header text */
690         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
691                 return false;
692
693         /* The bio layer requires at least sector-sized I/O */
694
695         if (ondisk->options.order < SECTOR_SHIFT)
696                 return false;
697
698         /* If we use u64 in a few spots we may be able to loosen this */
699
700         if (ondisk->options.order > 8 * sizeof (int) - 1)
701                 return false;
702
703         /*
704          * The size of a snapshot header has to fit in a size_t, and
705          * that limits the number of snapshots.
706          */
707         snap_count = le32_to_cpu(ondisk->snap_count);
708         size = SIZE_MAX - sizeof (struct ceph_snap_context);
709         if (snap_count > size / sizeof (__le64))
710                 return false;
711
712         /*
713          * Not only that, but the size of the entire the snapshot
714          * header must also be representable in a size_t.
715          */
716         size -= snap_count * sizeof (__le64);
717         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
718                 return false;
719
720         return true;
721 }
722
723 /*
724  * Create a new header structure, translate header format from the on-disk
725  * header.
726  */
727 static int rbd_header_from_disk(struct rbd_image_header *header,
728                                  struct rbd_image_header_ondisk *ondisk)
729 {
730         u32 snap_count;
731         size_t len;
732         size_t size;
733         u32 i;
734
735         memset(header, 0, sizeof (*header));
736
737         snap_count = le32_to_cpu(ondisk->snap_count);
738
739         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
740         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
741         if (!header->object_prefix)
742                 return -ENOMEM;
743         memcpy(header->object_prefix, ondisk->object_prefix, len);
744         header->object_prefix[len] = '\0';
745
746         if (snap_count) {
747                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
748
749                 /* Save a copy of the snapshot names */
750
751                 if (snap_names_len > (u64) SIZE_MAX)
752                         return -EIO;
753                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
754                 if (!header->snap_names)
755                         goto out_err;
756                 /*
757                  * Note that rbd_dev_v1_header_read() guarantees
758                  * the ondisk buffer we're working with has
759                  * snap_names_len bytes beyond the end of the
760                  * snapshot id array, this memcpy() is safe.
761                  */
762                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
763                         snap_names_len);
764
765                 /* Record each snapshot's size */
766
767                 size = snap_count * sizeof (*header->snap_sizes);
768                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
769                 if (!header->snap_sizes)
770                         goto out_err;
771                 for (i = 0; i < snap_count; i++)
772                         header->snap_sizes[i] =
773                                 le64_to_cpu(ondisk->snaps[i].image_size);
774         } else {
775                 WARN_ON(ondisk->snap_names_len);
776                 header->snap_names = NULL;
777                 header->snap_sizes = NULL;
778         }
779
780         header->features = 0;   /* No features support in v1 images */
781         header->obj_order = ondisk->options.order;
782         header->crypt_type = ondisk->options.crypt_type;
783         header->comp_type = ondisk->options.comp_type;
784
785         /* Allocate and fill in the snapshot context */
786
787         header->image_size = le64_to_cpu(ondisk->image_size);
788         size = sizeof (struct ceph_snap_context);
789         size += snap_count * sizeof (header->snapc->snaps[0]);
790         header->snapc = kzalloc(size, GFP_KERNEL);
791         if (!header->snapc)
792                 goto out_err;
793
794         atomic_set(&header->snapc->nref, 1);
795         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
796         header->snapc->num_snaps = snap_count;
797         for (i = 0; i < snap_count; i++)
798                 header->snapc->snaps[i] =
799                         le64_to_cpu(ondisk->snaps[i].id);
800
801         return 0;
802
803 out_err:
804         kfree(header->snap_sizes);
805         header->snap_sizes = NULL;
806         kfree(header->snap_names);
807         header->snap_names = NULL;
808         kfree(header->object_prefix);
809         header->object_prefix = NULL;
810
811         return -ENOMEM;
812 }
813
814 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
815 {
816         struct rbd_snap *snap;
817
818         if (snap_id == CEPH_NOSNAP)
819                 return RBD_SNAP_HEAD_NAME;
820
821         list_for_each_entry(snap, &rbd_dev->snaps, node)
822                 if (snap_id == snap->id)
823                         return snap->name;
824
825         return NULL;
826 }
827
828 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
829 {
830
831         struct rbd_snap *snap;
832
833         list_for_each_entry(snap, &rbd_dev->snaps, node) {
834                 if (!strcmp(snap_name, snap->name)) {
835                         rbd_dev->spec->snap_id = snap->id;
836                         rbd_dev->mapping.size = snap->size;
837                         rbd_dev->mapping.features = snap->features;
838
839                         return 0;
840                 }
841         }
842
843         return -ENOENT;
844 }
845
846 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
847 {
848         int ret;
849
850         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
851                     sizeof (RBD_SNAP_HEAD_NAME))) {
852                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
853                 rbd_dev->mapping.size = rbd_dev->header.image_size;
854                 rbd_dev->mapping.features = rbd_dev->header.features;
855                 ret = 0;
856         } else {
857                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
858                 if (ret < 0)
859                         goto done;
860                 rbd_dev->mapping.read_only = true;
861         }
862         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
863
864 done:
865         return ret;
866 }
867
868 static void rbd_header_free(struct rbd_image_header *header)
869 {
870         kfree(header->object_prefix);
871         header->object_prefix = NULL;
872         kfree(header->snap_sizes);
873         header->snap_sizes = NULL;
874         kfree(header->snap_names);
875         header->snap_names = NULL;
876         ceph_put_snap_context(header->snapc);
877         header->snapc = NULL;
878 }
879
880 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
881 {
882         char *name;
883         u64 segment;
884         int ret;
885
886         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
887         if (!name)
888                 return NULL;
889         segment = offset >> rbd_dev->header.obj_order;
890         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
891                         rbd_dev->header.object_prefix, segment);
892         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
893                 pr_err("error formatting segment name for #%llu (%d)\n",
894                         segment, ret);
895                 kfree(name);
896                 name = NULL;
897         }
898
899         return name;
900 }
901
902 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
903 {
904         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
905
906         return offset & (segment_size - 1);
907 }
908
909 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
910                                 u64 offset, u64 length)
911 {
912         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
913
914         offset &= segment_size - 1;
915
916         rbd_assert(length <= U64_MAX - offset);
917         if (offset + length > segment_size)
918                 length = segment_size - offset;
919
920         return length;
921 }
922
923 /*
924  * returns the size of an object in the image
925  */
926 static u64 rbd_obj_bytes(struct rbd_image_header *header)
927 {
928         return 1 << header->obj_order;
929 }
930
931 /*
932  * bio helpers
933  */
934
935 static void bio_chain_put(struct bio *chain)
936 {
937         struct bio *tmp;
938
939         while (chain) {
940                 tmp = chain;
941                 chain = chain->bi_next;
942                 bio_put(tmp);
943         }
944 }
945
946 /*
947  * zeros a bio chain, starting at specific offset
948  */
949 static void zero_bio_chain(struct bio *chain, int start_ofs)
950 {
951         struct bio_vec *bv;
952         unsigned long flags;
953         void *buf;
954         int i;
955         int pos = 0;
956
957         while (chain) {
958                 bio_for_each_segment(bv, chain, i) {
959                         if (pos + bv->bv_len > start_ofs) {
960                                 int remainder = max(start_ofs - pos, 0);
961                                 buf = bvec_kmap_irq(bv, &flags);
962                                 memset(buf + remainder, 0,
963                                        bv->bv_len - remainder);
964                                 bvec_kunmap_irq(buf, &flags);
965                         }
966                         pos += bv->bv_len;
967                 }
968
969                 chain = chain->bi_next;
970         }
971 }
972
973 /*
974  * similar to zero_bio_chain(), zeros data defined by a page array,
975  * starting at the given byte offset from the start of the array and
976  * continuing up to the given end offset.  The pages array is
977  * assumed to be big enough to hold all bytes up to the end.
978  */
979 static void zero_pages(struct page **pages, u64 offset, u64 end)
980 {
981         struct page **page = &pages[offset >> PAGE_SHIFT];
982
983         rbd_assert(end > offset);
984         rbd_assert(end - offset <= (u64)SIZE_MAX);
985         while (offset < end) {
986                 size_t page_offset;
987                 size_t length;
988                 unsigned long flags;
989                 void *kaddr;
990
991                 page_offset = (size_t)(offset & ~PAGE_MASK);
992                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
993                 local_irq_save(flags);
994                 kaddr = kmap_atomic(*page);
995                 memset(kaddr + page_offset, 0, length);
996                 kunmap_atomic(kaddr);
997                 local_irq_restore(flags);
998
999                 offset += length;
1000                 page++;
1001         }
1002 }
1003
1004 /*
1005  * Clone a portion of a bio, starting at the given byte offset
1006  * and continuing for the number of bytes indicated.
1007  */
1008 static struct bio *bio_clone_range(struct bio *bio_src,
1009                                         unsigned int offset,
1010                                         unsigned int len,
1011                                         gfp_t gfpmask)
1012 {
1013         struct bio_vec *bv;
1014         unsigned int resid;
1015         unsigned short idx;
1016         unsigned int voff;
1017         unsigned short end_idx;
1018         unsigned short vcnt;
1019         struct bio *bio;
1020
1021         /* Handle the easy case for the caller */
1022
1023         if (!offset && len == bio_src->bi_size)
1024                 return bio_clone(bio_src, gfpmask);
1025
1026         if (WARN_ON_ONCE(!len))
1027                 return NULL;
1028         if (WARN_ON_ONCE(len > bio_src->bi_size))
1029                 return NULL;
1030         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1031                 return NULL;
1032
1033         /* Find first affected segment... */
1034
1035         resid = offset;
1036         __bio_for_each_segment(bv, bio_src, idx, 0) {
1037                 if (resid < bv->bv_len)
1038                         break;
1039                 resid -= bv->bv_len;
1040         }
1041         voff = resid;
1042
1043         /* ...and the last affected segment */
1044
1045         resid += len;
1046         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1047                 if (resid <= bv->bv_len)
1048                         break;
1049                 resid -= bv->bv_len;
1050         }
1051         vcnt = end_idx - idx + 1;
1052
1053         /* Build the clone */
1054
1055         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1056         if (!bio)
1057                 return NULL;    /* ENOMEM */
1058
1059         bio->bi_bdev = bio_src->bi_bdev;
1060         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1061         bio->bi_rw = bio_src->bi_rw;
1062         bio->bi_flags |= 1 << BIO_CLONED;
1063
1064         /*
1065          * Copy over our part of the bio_vec, then update the first
1066          * and last (or only) entries.
1067          */
1068         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1069                         vcnt * sizeof (struct bio_vec));
1070         bio->bi_io_vec[0].bv_offset += voff;
1071         if (vcnt > 1) {
1072                 bio->bi_io_vec[0].bv_len -= voff;
1073                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1074         } else {
1075                 bio->bi_io_vec[0].bv_len = len;
1076         }
1077
1078         bio->bi_vcnt = vcnt;
1079         bio->bi_size = len;
1080         bio->bi_idx = 0;
1081
1082         return bio;
1083 }
1084
1085 /*
1086  * Clone a portion of a bio chain, starting at the given byte offset
1087  * into the first bio in the source chain and continuing for the
1088  * number of bytes indicated.  The result is another bio chain of
1089  * exactly the given length, or a null pointer on error.
1090  *
1091  * The bio_src and offset parameters are both in-out.  On entry they
1092  * refer to the first source bio and the offset into that bio where
1093  * the start of data to be cloned is located.
1094  *
1095  * On return, bio_src is updated to refer to the bio in the source
1096  * chain that contains first un-cloned byte, and *offset will
1097  * contain the offset of that byte within that bio.
1098  */
1099 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1100                                         unsigned int *offset,
1101                                         unsigned int len,
1102                                         gfp_t gfpmask)
1103 {
1104         struct bio *bi = *bio_src;
1105         unsigned int off = *offset;
1106         struct bio *chain = NULL;
1107         struct bio **end;
1108
1109         /* Build up a chain of clone bios up to the limit */
1110
1111         if (!bi || off >= bi->bi_size || !len)
1112                 return NULL;            /* Nothing to clone */
1113
1114         end = &chain;
1115         while (len) {
1116                 unsigned int bi_size;
1117                 struct bio *bio;
1118
1119                 if (!bi) {
1120                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1121                         goto out_err;   /* EINVAL; ran out of bio's */
1122                 }
1123                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1124                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1125                 if (!bio)
1126                         goto out_err;   /* ENOMEM */
1127
1128                 *end = bio;
1129                 end = &bio->bi_next;
1130
1131                 off += bi_size;
1132                 if (off == bi->bi_size) {
1133                         bi = bi->bi_next;
1134                         off = 0;
1135                 }
1136                 len -= bi_size;
1137         }
1138         *bio_src = bi;
1139         *offset = off;
1140
1141         return chain;
1142 out_err:
1143         bio_chain_put(chain);
1144
1145         return NULL;
1146 }
1147
1148 /*
1149  * The default/initial value for all object request flags is 0.  For
1150  * each flag, once its value is set to 1 it is never reset to 0
1151  * again.
1152  */
1153 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1154 {
1155         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1156                 struct rbd_device *rbd_dev;
1157
1158                 rbd_dev = obj_request->img_request->rbd_dev;
1159                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1160                         obj_request);
1161         }
1162 }
1163
1164 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1165 {
1166         smp_mb();
1167         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1168 }
1169
1170 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1171 {
1172         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1173                 struct rbd_device *rbd_dev = NULL;
1174
1175                 if (obj_request_img_data_test(obj_request))
1176                         rbd_dev = obj_request->img_request->rbd_dev;
1177                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1178                         obj_request);
1179         }
1180 }
1181
1182 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1183 {
1184         smp_mb();
1185         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1186 }
1187
1188 /*
1189  * This sets the KNOWN flag after (possibly) setting the EXISTS
1190  * flag.  The latter is set based on the "exists" value provided.
1191  *
1192  * Note that for our purposes once an object exists it never goes
1193  * away again.  It's possible that the response from two existence
1194  * checks are separated by the creation of the target object, and
1195  * the first ("doesn't exist") response arrives *after* the second
1196  * ("does exist").  In that case we ignore the second one.
1197  */
1198 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1199                                 bool exists)
1200 {
1201         if (exists)
1202                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1203         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1204         smp_mb();
1205 }
1206
1207 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1208 {
1209         smp_mb();
1210         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1211 }
1212
1213 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1214 {
1215         smp_mb();
1216         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1217 }
1218
1219 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1220 {
1221         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1222                 atomic_read(&obj_request->kref.refcount));
1223         kref_get(&obj_request->kref);
1224 }
1225
1226 static void rbd_obj_request_destroy(struct kref *kref);
1227 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1228 {
1229         rbd_assert(obj_request != NULL);
1230         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1231                 atomic_read(&obj_request->kref.refcount));
1232         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1233 }
1234
1235 static void rbd_img_request_get(struct rbd_img_request *img_request)
1236 {
1237         dout("%s: img %p (was %d)\n", __func__, img_request,
1238                 atomic_read(&img_request->kref.refcount));
1239         kref_get(&img_request->kref);
1240 }
1241
1242 static void rbd_img_request_destroy(struct kref *kref);
1243 static void rbd_img_request_put(struct rbd_img_request *img_request)
1244 {
1245         rbd_assert(img_request != NULL);
1246         dout("%s: img %p (was %d)\n", __func__, img_request,
1247                 atomic_read(&img_request->kref.refcount));
1248         kref_put(&img_request->kref, rbd_img_request_destroy);
1249 }
1250
1251 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1252                                         struct rbd_obj_request *obj_request)
1253 {
1254         rbd_assert(obj_request->img_request == NULL);
1255
1256         /* Image request now owns object's original reference */
1257         obj_request->img_request = img_request;
1258         obj_request->which = img_request->obj_request_count;
1259         rbd_assert(!obj_request_img_data_test(obj_request));
1260         obj_request_img_data_set(obj_request);
1261         rbd_assert(obj_request->which != BAD_WHICH);
1262         img_request->obj_request_count++;
1263         list_add_tail(&obj_request->links, &img_request->obj_requests);
1264         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1265                 obj_request->which);
1266 }
1267
1268 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1269                                         struct rbd_obj_request *obj_request)
1270 {
1271         rbd_assert(obj_request->which != BAD_WHICH);
1272
1273         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1274                 obj_request->which);
1275         list_del(&obj_request->links);
1276         rbd_assert(img_request->obj_request_count > 0);
1277         img_request->obj_request_count--;
1278         rbd_assert(obj_request->which == img_request->obj_request_count);
1279         obj_request->which = BAD_WHICH;
1280         rbd_assert(obj_request_img_data_test(obj_request));
1281         rbd_assert(obj_request->img_request == img_request);
1282         obj_request->img_request = NULL;
1283         obj_request->callback = NULL;
1284         rbd_obj_request_put(obj_request);
1285 }
1286
1287 static bool obj_request_type_valid(enum obj_request_type type)
1288 {
1289         switch (type) {
1290         case OBJ_REQUEST_NODATA:
1291         case OBJ_REQUEST_BIO:
1292         case OBJ_REQUEST_PAGES:
1293                 return true;
1294         default:
1295                 return false;
1296         }
1297 }
1298
1299 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1300                                 struct rbd_obj_request *obj_request)
1301 {
1302         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1303
1304         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1305 }
1306
1307 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1308 {
1309
1310         dout("%s: img %p\n", __func__, img_request);
1311
1312         /*
1313          * If no error occurred, compute the aggregate transfer
1314          * count for the image request.  We could instead use
1315          * atomic64_cmpxchg() to update it as each object request
1316          * completes; not clear which way is better off hand.
1317          */
1318         if (!img_request->result) {
1319                 struct rbd_obj_request *obj_request;
1320                 u64 xferred = 0;
1321
1322                 for_each_obj_request(img_request, obj_request)
1323                         xferred += obj_request->xferred;
1324                 img_request->xferred = xferred;
1325         }
1326
1327         if (img_request->callback)
1328                 img_request->callback(img_request);
1329         else
1330                 rbd_img_request_put(img_request);
1331 }
1332
1333 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1334
1335 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1336 {
1337         dout("%s: obj %p\n", __func__, obj_request);
1338
1339         return wait_for_completion_interruptible(&obj_request->completion);
1340 }
1341
1342 /*
1343  * The default/initial value for all image request flags is 0.  Each
1344  * is conditionally set to 1 at image request initialization time
1345  * and currently never change thereafter.
1346  */
1347 static void img_request_write_set(struct rbd_img_request *img_request)
1348 {
1349         set_bit(IMG_REQ_WRITE, &img_request->flags);
1350         smp_mb();
1351 }
1352
1353 static bool img_request_write_test(struct rbd_img_request *img_request)
1354 {
1355         smp_mb();
1356         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1357 }
1358
1359 static void img_request_child_set(struct rbd_img_request *img_request)
1360 {
1361         set_bit(IMG_REQ_CHILD, &img_request->flags);
1362         smp_mb();
1363 }
1364
1365 static bool img_request_child_test(struct rbd_img_request *img_request)
1366 {
1367         smp_mb();
1368         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1369 }
1370
1371 static void img_request_layered_set(struct rbd_img_request *img_request)
1372 {
1373         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1374         smp_mb();
1375 }
1376
1377 static bool img_request_layered_test(struct rbd_img_request *img_request)
1378 {
1379         smp_mb();
1380         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1381 }
1382
1383 static void
1384 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1385 {
1386         u64 xferred = obj_request->xferred;
1387         u64 length = obj_request->length;
1388
1389         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1390                 obj_request, obj_request->img_request, obj_request->result,
1391                 xferred, length);
1392         /*
1393          * ENOENT means a hole in the image.  We zero-fill the
1394          * entire length of the request.  A short read also implies
1395          * zero-fill to the end of the request.  Either way we
1396          * update the xferred count to indicate the whole request
1397          * was satisfied.
1398          */
1399         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1400         if (obj_request->result == -ENOENT) {
1401                 if (obj_request->type == OBJ_REQUEST_BIO)
1402                         zero_bio_chain(obj_request->bio_list, 0);
1403                 else
1404                         zero_pages(obj_request->pages, 0, length);
1405                 obj_request->result = 0;
1406                 obj_request->xferred = length;
1407         } else if (xferred < length && !obj_request->result) {
1408                 if (obj_request->type == OBJ_REQUEST_BIO)
1409                         zero_bio_chain(obj_request->bio_list, xferred);
1410                 else
1411                         zero_pages(obj_request->pages, xferred, length);
1412                 obj_request->xferred = length;
1413         }
1414         obj_request_done_set(obj_request);
1415 }
1416
1417 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1418 {
1419         dout("%s: obj %p cb %p\n", __func__, obj_request,
1420                 obj_request->callback);
1421         if (obj_request->callback)
1422                 obj_request->callback(obj_request);
1423         else
1424                 complete_all(&obj_request->completion);
1425 }
1426
1427 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1428 {
1429         dout("%s: obj %p\n", __func__, obj_request);
1430         obj_request_done_set(obj_request);
1431 }
1432
1433 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1434 {
1435         struct rbd_img_request *img_request = NULL;
1436         bool layered = false;
1437
1438         if (obj_request_img_data_test(obj_request)) {
1439                 img_request = obj_request->img_request;
1440                 layered = img_request && img_request_layered_test(img_request);
1441         } else {
1442                 img_request = NULL;
1443                 layered = false;
1444         }
1445
1446         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1447                 obj_request, img_request, obj_request->result,
1448                 obj_request->xferred, obj_request->length);
1449         if (layered && obj_request->result == -ENOENT)
1450                 rbd_img_parent_read(obj_request);
1451         else if (img_request)
1452                 rbd_img_obj_request_read_callback(obj_request);
1453         else
1454                 obj_request_done_set(obj_request);
1455 }
1456
1457 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1458 {
1459         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1460                 obj_request->result, obj_request->length);
1461         /*
1462          * There is no such thing as a successful short write.  Set
1463          * it to our originally-requested length.
1464          */
1465         obj_request->xferred = obj_request->length;
1466         obj_request_done_set(obj_request);
1467 }
1468
1469 /*
1470  * For a simple stat call there's nothing to do.  We'll do more if
1471  * this is part of a write sequence for a layered image.
1472  */
1473 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1474 {
1475         dout("%s: obj %p\n", __func__, obj_request);
1476         obj_request_done_set(obj_request);
1477 }
1478
1479 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1480                                 struct ceph_msg *msg)
1481 {
1482         struct rbd_obj_request *obj_request = osd_req->r_priv;
1483         u16 opcode;
1484
1485         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1486         rbd_assert(osd_req == obj_request->osd_req);
1487         if (obj_request_img_data_test(obj_request)) {
1488                 rbd_assert(obj_request->img_request);
1489                 rbd_assert(obj_request->which != BAD_WHICH);
1490         } else {
1491                 rbd_assert(obj_request->which == BAD_WHICH);
1492         }
1493
1494         if (osd_req->r_result < 0)
1495                 obj_request->result = osd_req->r_result;
1496         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1497
1498         WARN_ON(osd_req->r_num_ops != 1);       /* For now */
1499
1500         /*
1501          * We support a 64-bit length, but ultimately it has to be
1502          * passed to blk_end_request(), which takes an unsigned int.
1503          */
1504         obj_request->xferred = osd_req->r_reply_op_len[0];
1505         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1506         opcode = osd_req->r_ops[0].op;
1507         switch (opcode) {
1508         case CEPH_OSD_OP_READ:
1509                 rbd_osd_read_callback(obj_request);
1510                 break;
1511         case CEPH_OSD_OP_WRITE:
1512                 rbd_osd_write_callback(obj_request);
1513                 break;
1514         case CEPH_OSD_OP_STAT:
1515                 rbd_osd_stat_callback(obj_request);
1516                 break;
1517         case CEPH_OSD_OP_CALL:
1518         case CEPH_OSD_OP_NOTIFY_ACK:
1519         case CEPH_OSD_OP_WATCH:
1520                 rbd_osd_trivial_callback(obj_request);
1521                 break;
1522         default:
1523                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1524                         obj_request->object_name, (unsigned short) opcode);
1525                 break;
1526         }
1527
1528         if (obj_request_done_test(obj_request))
1529                 rbd_obj_request_complete(obj_request);
1530 }
1531
1532 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1533 {
1534         struct rbd_img_request *img_request = obj_request->img_request;
1535         struct ceph_osd_request *osd_req = obj_request->osd_req;
1536         u64 snap_id;
1537
1538         rbd_assert(osd_req != NULL);
1539
1540         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1541         ceph_osdc_build_request(osd_req, obj_request->offset,
1542                         NULL, snap_id, NULL);
1543 }
1544
1545 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1546 {
1547         struct rbd_img_request *img_request = obj_request->img_request;
1548         struct ceph_osd_request *osd_req = obj_request->osd_req;
1549         struct ceph_snap_context *snapc;
1550         struct timespec mtime = CURRENT_TIME;
1551
1552         rbd_assert(osd_req != NULL);
1553
1554         snapc = img_request ? img_request->snapc : NULL;
1555         ceph_osdc_build_request(osd_req, obj_request->offset,
1556                         snapc, CEPH_NOSNAP, &mtime);
1557 }
1558
1559 static struct ceph_osd_request *rbd_osd_req_create(
1560                                         struct rbd_device *rbd_dev,
1561                                         bool write_request,
1562                                         struct rbd_obj_request *obj_request)
1563 {
1564         struct ceph_snap_context *snapc = NULL;
1565         struct ceph_osd_client *osdc;
1566         struct ceph_osd_request *osd_req;
1567
1568         if (obj_request_img_data_test(obj_request)) {
1569                 struct rbd_img_request *img_request = obj_request->img_request;
1570
1571                 rbd_assert(write_request ==
1572                                 img_request_write_test(img_request));
1573                 if (write_request)
1574                         snapc = img_request->snapc;
1575         }
1576
1577         /* Allocate and initialize the request, for the single op */
1578
1579         osdc = &rbd_dev->rbd_client->client->osdc;
1580         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1581         if (!osd_req)
1582                 return NULL;    /* ENOMEM */
1583
1584         if (write_request)
1585                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1586         else
1587                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1588
1589         osd_req->r_callback = rbd_osd_req_callback;
1590         osd_req->r_priv = obj_request;
1591
1592         osd_req->r_oid_len = strlen(obj_request->object_name);
1593         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1594         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1595
1596         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1597
1598         return osd_req;
1599 }
1600
1601 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1602 {
1603         ceph_osdc_put_request(osd_req);
1604 }
1605
1606 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1607
1608 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1609                                                 u64 offset, u64 length,
1610                                                 enum obj_request_type type)
1611 {
1612         struct rbd_obj_request *obj_request;
1613         size_t size;
1614         char *name;
1615
1616         rbd_assert(obj_request_type_valid(type));
1617
1618         size = strlen(object_name) + 1;
1619         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1620         if (!obj_request)
1621                 return NULL;
1622
1623         name = (char *)(obj_request + 1);
1624         obj_request->object_name = memcpy(name, object_name, size);
1625         obj_request->offset = offset;
1626         obj_request->length = length;
1627         obj_request->flags = 0;
1628         obj_request->which = BAD_WHICH;
1629         obj_request->type = type;
1630         INIT_LIST_HEAD(&obj_request->links);
1631         init_completion(&obj_request->completion);
1632         kref_init(&obj_request->kref);
1633
1634         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1635                 offset, length, (int)type, obj_request);
1636
1637         return obj_request;
1638 }
1639
1640 static void rbd_obj_request_destroy(struct kref *kref)
1641 {
1642         struct rbd_obj_request *obj_request;
1643
1644         obj_request = container_of(kref, struct rbd_obj_request, kref);
1645
1646         dout("%s: obj %p\n", __func__, obj_request);
1647
1648         rbd_assert(obj_request->img_request == NULL);
1649         rbd_assert(obj_request->which == BAD_WHICH);
1650
1651         if (obj_request->osd_req)
1652                 rbd_osd_req_destroy(obj_request->osd_req);
1653
1654         rbd_assert(obj_request_type_valid(obj_request->type));
1655         switch (obj_request->type) {
1656         case OBJ_REQUEST_NODATA:
1657                 break;          /* Nothing to do */
1658         case OBJ_REQUEST_BIO:
1659                 if (obj_request->bio_list)
1660                         bio_chain_put(obj_request->bio_list);
1661                 break;
1662         case OBJ_REQUEST_PAGES:
1663                 if (obj_request->pages)
1664                         ceph_release_page_vector(obj_request->pages,
1665                                                 obj_request->page_count);
1666                 break;
1667         }
1668
1669         kfree(obj_request);
1670 }
1671
1672 /*
1673  * Caller is responsible for filling in the list of object requests
1674  * that comprises the image request, and the Linux request pointer
1675  * (if there is one).
1676  */
1677 static struct rbd_img_request *rbd_img_request_create(
1678                                         struct rbd_device *rbd_dev,
1679                                         u64 offset, u64 length,
1680                                         bool write_request,
1681                                         bool child_request)
1682 {
1683         struct rbd_img_request *img_request;
1684         struct ceph_snap_context *snapc = NULL;
1685
1686         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1687         if (!img_request)
1688                 return NULL;
1689
1690         if (write_request) {
1691                 down_read(&rbd_dev->header_rwsem);
1692                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1693                 up_read(&rbd_dev->header_rwsem);
1694                 if (WARN_ON(!snapc)) {
1695                         kfree(img_request);
1696                         return NULL;    /* Shouldn't happen */
1697                 }
1698
1699         }
1700
1701         img_request->rq = NULL;
1702         img_request->rbd_dev = rbd_dev;
1703         img_request->offset = offset;
1704         img_request->length = length;
1705         img_request->flags = 0;
1706         if (write_request) {
1707                 img_request_write_set(img_request);
1708                 img_request->snapc = snapc;
1709         } else {
1710                 img_request->snap_id = rbd_dev->spec->snap_id;
1711         }
1712         if (child_request)
1713                 img_request_child_set(img_request);
1714         if (rbd_dev->parent_spec)
1715                 img_request_layered_set(img_request);
1716         spin_lock_init(&img_request->completion_lock);
1717         img_request->next_completion = 0;
1718         img_request->callback = NULL;
1719         img_request->result = 0;
1720         img_request->obj_request_count = 0;
1721         INIT_LIST_HEAD(&img_request->obj_requests);
1722         kref_init(&img_request->kref);
1723
1724         rbd_img_request_get(img_request);       /* Avoid a warning */
1725         rbd_img_request_put(img_request);       /* TEMPORARY */
1726
1727         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1728                 write_request ? "write" : "read", offset, length,
1729                 img_request);
1730
1731         return img_request;
1732 }
1733
1734 static void rbd_img_request_destroy(struct kref *kref)
1735 {
1736         struct rbd_img_request *img_request;
1737         struct rbd_obj_request *obj_request;
1738         struct rbd_obj_request *next_obj_request;
1739
1740         img_request = container_of(kref, struct rbd_img_request, kref);
1741
1742         dout("%s: img %p\n", __func__, img_request);
1743
1744         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1745                 rbd_img_obj_request_del(img_request, obj_request);
1746         rbd_assert(img_request->obj_request_count == 0);
1747
1748         if (img_request_write_test(img_request))
1749                 ceph_put_snap_context(img_request->snapc);
1750
1751         if (img_request_child_test(img_request))
1752                 rbd_obj_request_put(img_request->obj_request);
1753
1754         kfree(img_request);
1755 }
1756
1757 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1758 {
1759         struct rbd_img_request *img_request;
1760         unsigned int xferred;
1761         int result;
1762         bool more;
1763
1764         rbd_assert(obj_request_img_data_test(obj_request));
1765         img_request = obj_request->img_request;
1766
1767         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1768         xferred = (unsigned int)obj_request->xferred;
1769         result = obj_request->result;
1770         if (result) {
1771                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1772
1773                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1774                         img_request_write_test(img_request) ? "write" : "read",
1775                         obj_request->length, obj_request->img_offset,
1776                         obj_request->offset);
1777                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1778                         result, xferred);
1779                 if (!img_request->result)
1780                         img_request->result = result;
1781         }
1782
1783         if (img_request_child_test(img_request)) {
1784                 rbd_assert(img_request->obj_request != NULL);
1785                 more = obj_request->which < img_request->obj_request_count - 1;
1786         } else {
1787                 rbd_assert(img_request->rq != NULL);
1788                 more = blk_end_request(img_request->rq, result, xferred);
1789         }
1790
1791         return more;
1792 }
1793
1794 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1795 {
1796         struct rbd_img_request *img_request;
1797         u32 which = obj_request->which;
1798         bool more = true;
1799
1800         rbd_assert(obj_request_img_data_test(obj_request));
1801         img_request = obj_request->img_request;
1802
1803         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1804         rbd_assert(img_request != NULL);
1805         rbd_assert(img_request->obj_request_count > 0);
1806         rbd_assert(which != BAD_WHICH);
1807         rbd_assert(which < img_request->obj_request_count);
1808         rbd_assert(which >= img_request->next_completion);
1809
1810         spin_lock_irq(&img_request->completion_lock);
1811         if (which != img_request->next_completion)
1812                 goto out;
1813
1814         for_each_obj_request_from(img_request, obj_request) {
1815                 rbd_assert(more);
1816                 rbd_assert(which < img_request->obj_request_count);
1817
1818                 if (!obj_request_done_test(obj_request))
1819                         break;
1820                 more = rbd_img_obj_end_request(obj_request);
1821                 which++;
1822         }
1823
1824         rbd_assert(more ^ (which == img_request->obj_request_count));
1825         img_request->next_completion = which;
1826 out:
1827         spin_unlock_irq(&img_request->completion_lock);
1828
1829         if (!more)
1830                 rbd_img_request_complete(img_request);
1831 }
1832
1833 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1834                                         struct bio *bio_list)
1835 {
1836         struct rbd_device *rbd_dev = img_request->rbd_dev;
1837         struct rbd_obj_request *obj_request = NULL;
1838         struct rbd_obj_request *next_obj_request;
1839         bool write_request = img_request_write_test(img_request);
1840         unsigned int bio_offset;
1841         u64 img_offset;
1842         u64 resid;
1843         u16 opcode;
1844
1845         dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1846
1847         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1848         bio_offset = 0;
1849         img_offset = img_request->offset;
1850         rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1851         resid = img_request->length;
1852         rbd_assert(resid > 0);
1853         while (resid) {
1854                 struct ceph_osd_request *osd_req;
1855                 const char *object_name;
1856                 unsigned int clone_size;
1857                 u64 offset;
1858                 u64 length;
1859
1860                 object_name = rbd_segment_name(rbd_dev, img_offset);
1861                 if (!object_name)
1862                         goto out_unwind;
1863                 offset = rbd_segment_offset(rbd_dev, img_offset);
1864                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1865                 obj_request = rbd_obj_request_create(object_name,
1866                                                 offset, length,
1867                                                 OBJ_REQUEST_BIO);
1868                 kfree(object_name);     /* object request has its own copy */
1869                 if (!obj_request)
1870                         goto out_unwind;
1871
1872                 rbd_assert(length <= (u64) UINT_MAX);
1873                 clone_size = (unsigned int) length;
1874                 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1875                                                 &bio_offset, clone_size,
1876                                                 GFP_ATOMIC);
1877                 if (!obj_request->bio_list)
1878                         goto out_partial;
1879
1880                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1881                                                 obj_request);
1882                 if (!osd_req)
1883                         goto out_partial;
1884                 obj_request->osd_req = osd_req;
1885                 obj_request->callback = rbd_img_obj_callback;
1886
1887                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1888                                                 0, 0);
1889                 osd_req_op_extent_osd_data_bio(osd_req, 0,
1890                                 obj_request->bio_list, obj_request->length);
1891
1892                 if (write_request)
1893                         rbd_osd_req_format_write(obj_request);
1894                 else
1895                         rbd_osd_req_format_read(obj_request);
1896
1897                 obj_request->img_offset = img_offset;
1898                 rbd_img_obj_request_add(img_request, obj_request);
1899
1900                 img_offset += length;
1901                 resid -= length;
1902         }
1903
1904         return 0;
1905
1906 out_partial:
1907         rbd_obj_request_put(obj_request);
1908 out_unwind:
1909         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1910                 rbd_obj_request_put(obj_request);
1911
1912         return -ENOMEM;
1913 }
1914
1915 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
1916 {
1917         struct rbd_obj_request *orig_request;
1918         int result;
1919
1920         rbd_assert(!obj_request_img_data_test(obj_request));
1921
1922         /*
1923          * All we need from the object request is the original
1924          * request and the result of the STAT op.  Grab those, then
1925          * we're done with the request.
1926          */
1927         orig_request = obj_request->obj_request;
1928         obj_request->obj_request = NULL;
1929         rbd_assert(orig_request);
1930         rbd_assert(orig_request->img_request);
1931
1932         result = obj_request->result;
1933         obj_request->result = 0;
1934
1935         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
1936                 obj_request, orig_request, result,
1937                 obj_request->xferred, obj_request->length);
1938         rbd_obj_request_put(obj_request);
1939
1940         rbd_assert(orig_request);
1941         rbd_assert(orig_request->img_request);
1942
1943         /*
1944          * Our only purpose here is to determine whether the object
1945          * exists, and we don't want to treat the non-existence as
1946          * an error.  If something else comes back, transfer the
1947          * error to the original request and complete it now.
1948          */
1949         if (!result) {
1950                 obj_request_existence_set(orig_request, true);
1951         } else if (result == -ENOENT) {
1952                 obj_request_existence_set(orig_request, false);
1953         } else if (result) {
1954                 orig_request->result = result;
1955                 goto out_err;
1956         }
1957
1958         /*
1959          * Resubmit the original request now that we have recorded
1960          * whether the target object exists.
1961          */
1962         orig_request->result = rbd_img_obj_request_submit(orig_request);
1963 out_err:
1964         if (orig_request->result)
1965                 rbd_obj_request_complete(orig_request);
1966         rbd_obj_request_put(orig_request);
1967 }
1968
1969 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
1970 {
1971         struct rbd_obj_request *stat_request;
1972         struct rbd_device *rbd_dev;
1973         struct ceph_osd_client *osdc;
1974         struct page **pages = NULL;
1975         u32 page_count;
1976         size_t size;
1977         int ret;
1978
1979         /*
1980          * The response data for a STAT call consists of:
1981          *     le64 length;
1982          *     struct {
1983          *         le32 tv_sec;
1984          *         le32 tv_nsec;
1985          *     } mtime;
1986          */
1987         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
1988         page_count = (u32)calc_pages_for(0, size);
1989         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1990         if (IS_ERR(pages))
1991                 return PTR_ERR(pages);
1992
1993         ret = -ENOMEM;
1994         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
1995                                                         OBJ_REQUEST_PAGES);
1996         if (!stat_request)
1997                 goto out;
1998
1999         rbd_obj_request_get(obj_request);
2000         stat_request->obj_request = obj_request;
2001         stat_request->pages = pages;
2002         stat_request->page_count = page_count;
2003
2004         rbd_assert(obj_request->img_request);
2005         rbd_dev = obj_request->img_request->rbd_dev;
2006         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2007                                                 stat_request);
2008         if (!stat_request->osd_req)
2009                 goto out;
2010         stat_request->callback = rbd_img_obj_exists_callback;
2011
2012         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2013         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2014                                         false, false);
2015         rbd_osd_req_format_read(stat_request);
2016
2017         osdc = &rbd_dev->rbd_client->client->osdc;
2018         ret = rbd_obj_request_submit(osdc, stat_request);
2019 out:
2020         if (ret)
2021                 rbd_obj_request_put(obj_request);
2022
2023         return ret;
2024 }
2025
2026 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2027 {
2028         struct rbd_img_request *img_request;
2029
2030         rbd_assert(obj_request_img_data_test(obj_request));
2031
2032         img_request = obj_request->img_request;
2033         rbd_assert(img_request);
2034
2035         /* (At the moment we don't care whether it exists or not...) */
2036         (void) obj_request_exists_test;
2037
2038         /*
2039          * Only layered writes need special handling.  If it's not a
2040          * layered write, or it is a layered write but we know the
2041          * target object exists, it's no different from any other
2042          * object request.
2043          */
2044         if (!img_request_write_test(img_request) ||
2045                 !img_request_layered_test(img_request) ||
2046                 obj_request_known_test(obj_request)) {
2047
2048                 struct rbd_device *rbd_dev;
2049                 struct ceph_osd_client *osdc;
2050
2051                 rbd_dev = obj_request->img_request->rbd_dev;
2052                 osdc = &rbd_dev->rbd_client->client->osdc;
2053
2054                 return rbd_obj_request_submit(osdc, obj_request);
2055         }
2056
2057         /*
2058          * It's a layered write and we don't know whether the target
2059          * exists.  Issue existence check; once that completes the
2060          * original request will be submitted again.
2061          */
2062
2063         return rbd_img_obj_exists_submit(obj_request);
2064 }
2065
2066 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2067 {
2068         struct rbd_obj_request *obj_request;
2069         struct rbd_obj_request *next_obj_request;
2070
2071         dout("%s: img %p\n", __func__, img_request);
2072         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2073                 int ret;
2074
2075                 ret = rbd_img_obj_request_submit(obj_request);
2076                 if (ret)
2077                         return ret;
2078         }
2079
2080         return 0;
2081 }
2082
2083 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2084 {
2085         struct rbd_obj_request *obj_request;
2086
2087         rbd_assert(img_request_child_test(img_request));
2088
2089         obj_request = img_request->obj_request;
2090         rbd_assert(obj_request != NULL);
2091         obj_request->result = img_request->result;
2092         obj_request->xferred = img_request->xferred;
2093
2094         rbd_img_obj_request_read_callback(obj_request);
2095         rbd_obj_request_complete(obj_request);
2096 }
2097
2098 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2099 {
2100         struct rbd_device *rbd_dev;
2101         struct rbd_img_request *img_request;
2102         int result;
2103
2104         rbd_assert(obj_request_img_data_test(obj_request));
2105         rbd_assert(obj_request->img_request != NULL);
2106         rbd_assert(obj_request->result == (s32) -ENOENT);
2107         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2108
2109         rbd_dev = obj_request->img_request->rbd_dev;
2110         rbd_assert(rbd_dev->parent != NULL);
2111         /* rbd_read_finish(obj_request, obj_request->length); */
2112         img_request = rbd_img_request_create(rbd_dev->parent,
2113                                                 obj_request->img_offset,
2114                                                 obj_request->length,
2115                                                 false, true);
2116         result = -ENOMEM;
2117         if (!img_request)
2118                 goto out_err;
2119
2120         rbd_obj_request_get(obj_request);
2121         img_request->obj_request = obj_request;
2122
2123         result = rbd_img_request_fill_bio(img_request, obj_request->bio_list);
2124         if (result)
2125                 goto out_err;
2126
2127         img_request->callback = rbd_img_parent_read_callback;
2128         result = rbd_img_request_submit(img_request);
2129         if (result)
2130                 goto out_err;
2131
2132         return;
2133 out_err:
2134         if (img_request)
2135                 rbd_img_request_put(img_request);
2136         obj_request->result = result;
2137         obj_request->xferred = 0;
2138         obj_request_done_set(obj_request);
2139 }
2140
2141 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2142                                    u64 ver, u64 notify_id)
2143 {
2144         struct rbd_obj_request *obj_request;
2145         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2146         int ret;
2147
2148         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2149                                                         OBJ_REQUEST_NODATA);
2150         if (!obj_request)
2151                 return -ENOMEM;
2152
2153         ret = -ENOMEM;
2154         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2155         if (!obj_request->osd_req)
2156                 goto out;
2157         obj_request->callback = rbd_obj_request_put;
2158
2159         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2160                                         notify_id, ver, 0);
2161         rbd_osd_req_format_read(obj_request);
2162
2163         ret = rbd_obj_request_submit(osdc, obj_request);
2164 out:
2165         if (ret)
2166                 rbd_obj_request_put(obj_request);
2167
2168         return ret;
2169 }
2170
2171 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2172 {
2173         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2174         u64 hver;
2175         int rc;
2176
2177         if (!rbd_dev)
2178                 return;
2179
2180         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2181                 rbd_dev->header_name, (unsigned long long) notify_id,
2182                 (unsigned int) opcode);
2183         rc = rbd_dev_refresh(rbd_dev, &hver);
2184         if (rc)
2185                 rbd_warn(rbd_dev, "got notification but failed to "
2186                            " update snaps: %d\n", rc);
2187
2188         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2189 }
2190
2191 /*
2192  * Request sync osd watch/unwatch.  The value of "start" determines
2193  * whether a watch request is being initiated or torn down.
2194  */
2195 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2196 {
2197         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2198         struct rbd_obj_request *obj_request;
2199         int ret;
2200
2201         rbd_assert(start ^ !!rbd_dev->watch_event);
2202         rbd_assert(start ^ !!rbd_dev->watch_request);
2203
2204         if (start) {
2205                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2206                                                 &rbd_dev->watch_event);
2207                 if (ret < 0)
2208                         return ret;
2209                 rbd_assert(rbd_dev->watch_event != NULL);
2210         }
2211
2212         ret = -ENOMEM;
2213         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2214                                                         OBJ_REQUEST_NODATA);
2215         if (!obj_request)
2216                 goto out_cancel;
2217
2218         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2219         if (!obj_request->osd_req)
2220                 goto out_cancel;
2221
2222         if (start)
2223                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2224         else
2225                 ceph_osdc_unregister_linger_request(osdc,
2226                                         rbd_dev->watch_request->osd_req);
2227
2228         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2229                                 rbd_dev->watch_event->cookie,
2230                                 rbd_dev->header.obj_version, start);
2231         rbd_osd_req_format_write(obj_request);
2232
2233         ret = rbd_obj_request_submit(osdc, obj_request);
2234         if (ret)
2235                 goto out_cancel;
2236         ret = rbd_obj_request_wait(obj_request);
2237         if (ret)
2238                 goto out_cancel;
2239         ret = obj_request->result;
2240         if (ret)
2241                 goto out_cancel;
2242
2243         /*
2244          * A watch request is set to linger, so the underlying osd
2245          * request won't go away until we unregister it.  We retain
2246          * a pointer to the object request during that time (in
2247          * rbd_dev->watch_request), so we'll keep a reference to
2248          * it.  We'll drop that reference (below) after we've
2249          * unregistered it.
2250          */
2251         if (start) {
2252                 rbd_dev->watch_request = obj_request;
2253
2254                 return 0;
2255         }
2256
2257         /* We have successfully torn down the watch request */
2258
2259         rbd_obj_request_put(rbd_dev->watch_request);
2260         rbd_dev->watch_request = NULL;
2261 out_cancel:
2262         /* Cancel the event if we're tearing down, or on error */
2263         ceph_osdc_cancel_event(rbd_dev->watch_event);
2264         rbd_dev->watch_event = NULL;
2265         if (obj_request)
2266                 rbd_obj_request_put(obj_request);
2267
2268         return ret;
2269 }
2270
2271 /*
2272  * Synchronous osd object method call
2273  */
2274 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2275                              const char *object_name,
2276                              const char *class_name,
2277                              const char *method_name,
2278                              const char *outbound,
2279                              size_t outbound_size,
2280                              char *inbound,
2281                              size_t inbound_size,
2282                              u64 *version)
2283 {
2284         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2285         struct rbd_obj_request *obj_request;
2286         struct page **pages;
2287         u32 page_count;
2288         int ret;
2289
2290         /*
2291          * Method calls are ultimately read operations.  The result
2292          * should placed into the inbound buffer provided.  They
2293          * also supply outbound data--parameters for the object
2294          * method.  Currently if this is present it will be a
2295          * snapshot id.
2296          */
2297         page_count = (u32) calc_pages_for(0, inbound_size);
2298         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2299         if (IS_ERR(pages))
2300                 return PTR_ERR(pages);
2301
2302         ret = -ENOMEM;
2303         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2304                                                         OBJ_REQUEST_PAGES);
2305         if (!obj_request)
2306                 goto out;
2307
2308         obj_request->pages = pages;
2309         obj_request->page_count = page_count;
2310
2311         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2312         if (!obj_request->osd_req)
2313                 goto out;
2314
2315         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2316                                         class_name, method_name);
2317         if (outbound_size) {
2318                 struct ceph_pagelist *pagelist;
2319
2320                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2321                 if (!pagelist)
2322                         goto out;
2323
2324                 ceph_pagelist_init(pagelist);
2325                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2326                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2327                                                 pagelist);
2328         }
2329         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2330                                         obj_request->pages, inbound_size,
2331                                         0, false, false);
2332         rbd_osd_req_format_read(obj_request);
2333
2334         ret = rbd_obj_request_submit(osdc, obj_request);
2335         if (ret)
2336                 goto out;
2337         ret = rbd_obj_request_wait(obj_request);
2338         if (ret)
2339                 goto out;
2340
2341         ret = obj_request->result;
2342         if (ret < 0)
2343                 goto out;
2344         ret = 0;
2345         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2346         if (version)
2347                 *version = obj_request->version;
2348 out:
2349         if (obj_request)
2350                 rbd_obj_request_put(obj_request);
2351         else
2352                 ceph_release_page_vector(pages, page_count);
2353
2354         return ret;
2355 }
2356
2357 static void rbd_request_fn(struct request_queue *q)
2358                 __releases(q->queue_lock) __acquires(q->queue_lock)
2359 {
2360         struct rbd_device *rbd_dev = q->queuedata;
2361         bool read_only = rbd_dev->mapping.read_only;
2362         struct request *rq;
2363         int result;
2364
2365         while ((rq = blk_fetch_request(q))) {
2366                 bool write_request = rq_data_dir(rq) == WRITE;
2367                 struct rbd_img_request *img_request;
2368                 u64 offset;
2369                 u64 length;
2370
2371                 /* Ignore any non-FS requests that filter through. */
2372
2373                 if (rq->cmd_type != REQ_TYPE_FS) {
2374                         dout("%s: non-fs request type %d\n", __func__,
2375                                 (int) rq->cmd_type);
2376                         __blk_end_request_all(rq, 0);
2377                         continue;
2378                 }
2379
2380                 /* Ignore/skip any zero-length requests */
2381
2382                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2383                 length = (u64) blk_rq_bytes(rq);
2384
2385                 if (!length) {
2386                         dout("%s: zero-length request\n", __func__);
2387                         __blk_end_request_all(rq, 0);
2388                         continue;
2389                 }
2390
2391                 spin_unlock_irq(q->queue_lock);
2392
2393                 /* Disallow writes to a read-only device */
2394
2395                 if (write_request) {
2396                         result = -EROFS;
2397                         if (read_only)
2398                                 goto end_request;
2399                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2400                 }
2401
2402                 /*
2403                  * Quit early if the mapped snapshot no longer
2404                  * exists.  It's still possible the snapshot will
2405                  * have disappeared by the time our request arrives
2406                  * at the osd, but there's no sense in sending it if
2407                  * we already know.
2408                  */
2409                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2410                         dout("request for non-existent snapshot");
2411                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2412                         result = -ENXIO;
2413                         goto end_request;
2414                 }
2415
2416                 result = -EINVAL;
2417                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2418                         goto end_request;       /* Shouldn't happen */
2419
2420                 result = -ENOMEM;
2421                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2422                                                         write_request, false);
2423                 if (!img_request)
2424                         goto end_request;
2425
2426                 img_request->rq = rq;
2427
2428                 result = rbd_img_request_fill_bio(img_request, rq->bio);
2429                 if (!result)
2430                         result = rbd_img_request_submit(img_request);
2431                 if (result)
2432                         rbd_img_request_put(img_request);
2433 end_request:
2434                 spin_lock_irq(q->queue_lock);
2435                 if (result < 0) {
2436                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2437                                 write_request ? "write" : "read",
2438                                 length, offset, result);
2439
2440                         __blk_end_request_all(rq, result);
2441                 }
2442         }
2443 }
2444
2445 /*
2446  * a queue callback. Makes sure that we don't create a bio that spans across
2447  * multiple osd objects. One exception would be with a single page bios,
2448  * which we handle later at bio_chain_clone_range()
2449  */
2450 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2451                           struct bio_vec *bvec)
2452 {
2453         struct rbd_device *rbd_dev = q->queuedata;
2454         sector_t sector_offset;
2455         sector_t sectors_per_obj;
2456         sector_t obj_sector_offset;
2457         int ret;
2458
2459         /*
2460          * Find how far into its rbd object the partition-relative
2461          * bio start sector is to offset relative to the enclosing
2462          * device.
2463          */
2464         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2465         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2466         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2467
2468         /*
2469          * Compute the number of bytes from that offset to the end
2470          * of the object.  Account for what's already used by the bio.
2471          */
2472         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2473         if (ret > bmd->bi_size)
2474                 ret -= bmd->bi_size;
2475         else
2476                 ret = 0;
2477
2478         /*
2479          * Don't send back more than was asked for.  And if the bio
2480          * was empty, let the whole thing through because:  "Note
2481          * that a block device *must* allow a single page to be
2482          * added to an empty bio."
2483          */
2484         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2485         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2486                 ret = (int) bvec->bv_len;
2487
2488         return ret;
2489 }
2490
2491 static void rbd_free_disk(struct rbd_device *rbd_dev)
2492 {
2493         struct gendisk *disk = rbd_dev->disk;
2494
2495         if (!disk)
2496                 return;
2497
2498         if (disk->flags & GENHD_FL_UP)
2499                 del_gendisk(disk);
2500         if (disk->queue)
2501                 blk_cleanup_queue(disk->queue);
2502         put_disk(disk);
2503 }
2504
2505 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2506                                 const char *object_name,
2507                                 u64 offset, u64 length,
2508                                 char *buf, u64 *version)
2509
2510 {
2511         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2512         struct rbd_obj_request *obj_request;
2513         struct page **pages = NULL;
2514         u32 page_count;
2515         size_t size;
2516         int ret;
2517
2518         page_count = (u32) calc_pages_for(offset, length);
2519         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2520         if (IS_ERR(pages))
2521                 ret = PTR_ERR(pages);
2522
2523         ret = -ENOMEM;
2524         obj_request = rbd_obj_request_create(object_name, offset, length,
2525                                                         OBJ_REQUEST_PAGES);
2526         if (!obj_request)
2527                 goto out;
2528
2529         obj_request->pages = pages;
2530         obj_request->page_count = page_count;
2531
2532         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2533         if (!obj_request->osd_req)
2534                 goto out;
2535
2536         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2537                                         offset, length, 0, 0);
2538         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2539                                         obj_request->pages,
2540                                         obj_request->length,
2541                                         obj_request->offset & ~PAGE_MASK,
2542                                         false, false);
2543         rbd_osd_req_format_read(obj_request);
2544
2545         ret = rbd_obj_request_submit(osdc, obj_request);
2546         if (ret)
2547                 goto out;
2548         ret = rbd_obj_request_wait(obj_request);
2549         if (ret)
2550                 goto out;
2551
2552         ret = obj_request->result;
2553         if (ret < 0)
2554                 goto out;
2555
2556         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2557         size = (size_t) obj_request->xferred;
2558         ceph_copy_from_page_vector(pages, buf, 0, size);
2559         rbd_assert(size <= (size_t) INT_MAX);
2560         ret = (int) size;
2561         if (version)
2562                 *version = obj_request->version;
2563 out:
2564         if (obj_request)
2565                 rbd_obj_request_put(obj_request);
2566         else
2567                 ceph_release_page_vector(pages, page_count);
2568
2569         return ret;
2570 }
2571
2572 /*
2573  * Read the complete header for the given rbd device.
2574  *
2575  * Returns a pointer to a dynamically-allocated buffer containing
2576  * the complete and validated header.  Caller can pass the address
2577  * of a variable that will be filled in with the version of the
2578  * header object at the time it was read.
2579  *
2580  * Returns a pointer-coded errno if a failure occurs.
2581  */
2582 static struct rbd_image_header_ondisk *
2583 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2584 {
2585         struct rbd_image_header_ondisk *ondisk = NULL;
2586         u32 snap_count = 0;
2587         u64 names_size = 0;
2588         u32 want_count;
2589         int ret;
2590
2591         /*
2592          * The complete header will include an array of its 64-bit
2593          * snapshot ids, followed by the names of those snapshots as
2594          * a contiguous block of NUL-terminated strings.  Note that
2595          * the number of snapshots could change by the time we read
2596          * it in, in which case we re-read it.
2597          */
2598         do {
2599                 size_t size;
2600
2601                 kfree(ondisk);
2602
2603                 size = sizeof (*ondisk);
2604                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2605                 size += names_size;
2606                 ondisk = kmalloc(size, GFP_KERNEL);
2607                 if (!ondisk)
2608                         return ERR_PTR(-ENOMEM);
2609
2610                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2611                                        0, size,
2612                                        (char *) ondisk, version);
2613                 if (ret < 0)
2614                         goto out_err;
2615                 if (WARN_ON((size_t) ret < size)) {
2616                         ret = -ENXIO;
2617                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2618                                 size, ret);
2619                         goto out_err;
2620                 }
2621                 if (!rbd_dev_ondisk_valid(ondisk)) {
2622                         ret = -ENXIO;
2623                         rbd_warn(rbd_dev, "invalid header");
2624                         goto out_err;
2625                 }
2626
2627                 names_size = le64_to_cpu(ondisk->snap_names_len);
2628                 want_count = snap_count;
2629                 snap_count = le32_to_cpu(ondisk->snap_count);
2630         } while (snap_count != want_count);
2631
2632         return ondisk;
2633
2634 out_err:
2635         kfree(ondisk);
2636
2637         return ERR_PTR(ret);
2638 }
2639
2640 /*
2641  * reload the ondisk the header
2642  */
2643 static int rbd_read_header(struct rbd_device *rbd_dev,
2644                            struct rbd_image_header *header)
2645 {
2646         struct rbd_image_header_ondisk *ondisk;
2647         u64 ver = 0;
2648         int ret;
2649
2650         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2651         if (IS_ERR(ondisk))
2652                 return PTR_ERR(ondisk);
2653         ret = rbd_header_from_disk(header, ondisk);
2654         if (ret >= 0)
2655                 header->obj_version = ver;
2656         kfree(ondisk);
2657
2658         return ret;
2659 }
2660
2661 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2662 {
2663         struct rbd_snap *snap;
2664         struct rbd_snap *next;
2665
2666         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2667                 rbd_remove_snap_dev(snap);
2668 }
2669
2670 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2671 {
2672         sector_t size;
2673
2674         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2675                 return;
2676
2677         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2678         dout("setting size to %llu sectors", (unsigned long long) size);
2679         rbd_dev->mapping.size = (u64) size;
2680         set_capacity(rbd_dev->disk, size);
2681 }
2682
2683 /*
2684  * only read the first part of the ondisk header, without the snaps info
2685  */
2686 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2687 {
2688         int ret;
2689         struct rbd_image_header h;
2690
2691         ret = rbd_read_header(rbd_dev, &h);
2692         if (ret < 0)
2693                 return ret;
2694
2695         down_write(&rbd_dev->header_rwsem);
2696
2697         /* Update image size, and check for resize of mapped image */
2698         rbd_dev->header.image_size = h.image_size;
2699         rbd_update_mapping_size(rbd_dev);
2700
2701         /* rbd_dev->header.object_prefix shouldn't change */
2702         kfree(rbd_dev->header.snap_sizes);
2703         kfree(rbd_dev->header.snap_names);
2704         /* osd requests may still refer to snapc */
2705         ceph_put_snap_context(rbd_dev->header.snapc);
2706
2707         if (hver)
2708                 *hver = h.obj_version;
2709         rbd_dev->header.obj_version = h.obj_version;
2710         rbd_dev->header.image_size = h.image_size;
2711         rbd_dev->header.snapc = h.snapc;
2712         rbd_dev->header.snap_names = h.snap_names;
2713         rbd_dev->header.snap_sizes = h.snap_sizes;
2714         /* Free the extra copy of the object prefix */
2715         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2716         kfree(h.object_prefix);
2717
2718         ret = rbd_dev_snaps_update(rbd_dev);
2719         if (!ret)
2720                 ret = rbd_dev_snaps_register(rbd_dev);
2721
2722         up_write(&rbd_dev->header_rwsem);
2723
2724         return ret;
2725 }
2726
2727 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2728 {
2729         int ret;
2730
2731         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2732         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2733         if (rbd_dev->image_format == 1)
2734                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2735         else
2736                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2737         mutex_unlock(&ctl_mutex);
2738
2739         return ret;
2740 }
2741
2742 static int rbd_init_disk(struct rbd_device *rbd_dev)
2743 {
2744         struct gendisk *disk;
2745         struct request_queue *q;
2746         u64 segment_size;
2747
2748         /* create gendisk info */
2749         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2750         if (!disk)
2751                 return -ENOMEM;
2752
2753         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2754                  rbd_dev->dev_id);
2755         disk->major = rbd_dev->major;
2756         disk->first_minor = 0;
2757         disk->fops = &rbd_bd_ops;
2758         disk->private_data = rbd_dev;
2759
2760         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2761         if (!q)
2762                 goto out_disk;
2763
2764         /* We use the default size, but let's be explicit about it. */
2765         blk_queue_physical_block_size(q, SECTOR_SIZE);
2766
2767         /* set io sizes to object size */
2768         segment_size = rbd_obj_bytes(&rbd_dev->header);
2769         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2770         blk_queue_max_segment_size(q, segment_size);
2771         blk_queue_io_min(q, segment_size);
2772         blk_queue_io_opt(q, segment_size);
2773
2774         blk_queue_merge_bvec(q, rbd_merge_bvec);
2775         disk->queue = q;
2776
2777         q->queuedata = rbd_dev;
2778
2779         rbd_dev->disk = disk;
2780
2781         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2782
2783         return 0;
2784 out_disk:
2785         put_disk(disk);
2786
2787         return -ENOMEM;
2788 }
2789
2790 /*
2791   sysfs
2792 */
2793
2794 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2795 {
2796         return container_of(dev, struct rbd_device, dev);
2797 }
2798
2799 static ssize_t rbd_size_show(struct device *dev,
2800                              struct device_attribute *attr, char *buf)
2801 {
2802         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2803         sector_t size;
2804
2805         down_read(&rbd_dev->header_rwsem);
2806         size = get_capacity(rbd_dev->disk);
2807         up_read(&rbd_dev->header_rwsem);
2808
2809         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2810 }
2811
2812 /*
2813  * Note this shows the features for whatever's mapped, which is not
2814  * necessarily the base image.
2815  */
2816 static ssize_t rbd_features_show(struct device *dev,
2817                              struct device_attribute *attr, char *buf)
2818 {
2819         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2820
2821         return sprintf(buf, "0x%016llx\n",
2822                         (unsigned long long) rbd_dev->mapping.features);
2823 }
2824
2825 static ssize_t rbd_major_show(struct device *dev,
2826                               struct device_attribute *attr, char *buf)
2827 {
2828         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2829
2830         return sprintf(buf, "%d\n", rbd_dev->major);
2831 }
2832
2833 static ssize_t rbd_client_id_show(struct device *dev,
2834                                   struct device_attribute *attr, char *buf)
2835 {
2836         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2837
2838         return sprintf(buf, "client%lld\n",
2839                         ceph_client_id(rbd_dev->rbd_client->client));
2840 }
2841
2842 static ssize_t rbd_pool_show(struct device *dev,
2843                              struct device_attribute *attr, char *buf)
2844 {
2845         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2846
2847         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2848 }
2849
2850 static ssize_t rbd_pool_id_show(struct device *dev,
2851                              struct device_attribute *attr, char *buf)
2852 {
2853         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2854
2855         return sprintf(buf, "%llu\n",
2856                 (unsigned long long) rbd_dev->spec->pool_id);
2857 }
2858
2859 static ssize_t rbd_name_show(struct device *dev,
2860                              struct device_attribute *attr, char *buf)
2861 {
2862         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2863
2864         if (rbd_dev->spec->image_name)
2865                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2866
2867         return sprintf(buf, "(unknown)\n");
2868 }
2869
2870 static ssize_t rbd_image_id_show(struct device *dev,
2871                              struct device_attribute *attr, char *buf)
2872 {
2873         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2874
2875         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2876 }
2877
2878 /*
2879  * Shows the name of the currently-mapped snapshot (or
2880  * RBD_SNAP_HEAD_NAME for the base image).
2881  */
2882 static ssize_t rbd_snap_show(struct device *dev,
2883                              struct device_attribute *attr,
2884                              char *buf)
2885 {
2886         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2887
2888         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2889 }
2890
2891 /*
2892  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2893  * for the parent image.  If there is no parent, simply shows
2894  * "(no parent image)".
2895  */
2896 static ssize_t rbd_parent_show(struct device *dev,
2897                              struct device_attribute *attr,
2898                              char *buf)
2899 {
2900         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2901         struct rbd_spec *spec = rbd_dev->parent_spec;
2902         int count;
2903         char *bufp = buf;
2904
2905         if (!spec)
2906                 return sprintf(buf, "(no parent image)\n");
2907
2908         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2909                         (unsigned long long) spec->pool_id, spec->pool_name);
2910         if (count < 0)
2911                 return count;
2912         bufp += count;
2913
2914         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2915                         spec->image_name ? spec->image_name : "(unknown)");
2916         if (count < 0)
2917                 return count;
2918         bufp += count;
2919
2920         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2921                         (unsigned long long) spec->snap_id, spec->snap_name);
2922         if (count < 0)
2923                 return count;
2924         bufp += count;
2925
2926         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2927         if (count < 0)
2928                 return count;
2929         bufp += count;
2930
2931         return (ssize_t) (bufp - buf);
2932 }
2933
2934 static ssize_t rbd_image_refresh(struct device *dev,
2935                                  struct device_attribute *attr,
2936                                  const char *buf,
2937                                  size_t size)
2938 {
2939         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2940         int ret;
2941
2942         ret = rbd_dev_refresh(rbd_dev, NULL);
2943
2944         return ret < 0 ? ret : size;
2945 }
2946
2947 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2948 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2949 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2950 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2951 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2952 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2953 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2954 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2955 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2956 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2957 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2958
2959 static struct attribute *rbd_attrs[] = {
2960         &dev_attr_size.attr,
2961         &dev_attr_features.attr,
2962         &dev_attr_major.attr,
2963         &dev_attr_client_id.attr,
2964         &dev_attr_pool.attr,
2965         &dev_attr_pool_id.attr,
2966         &dev_attr_name.attr,
2967         &dev_attr_image_id.attr,
2968         &dev_attr_current_snap.attr,
2969         &dev_attr_parent.attr,
2970         &dev_attr_refresh.attr,
2971         NULL
2972 };
2973
2974 static struct attribute_group rbd_attr_group = {
2975         .attrs = rbd_attrs,
2976 };
2977
2978 static const struct attribute_group *rbd_attr_groups[] = {
2979         &rbd_attr_group,
2980         NULL
2981 };
2982
2983 static void rbd_sysfs_dev_release(struct device *dev)
2984 {
2985 }
2986
2987 static struct device_type rbd_device_type = {
2988         .name           = "rbd",
2989         .groups         = rbd_attr_groups,
2990         .release        = rbd_sysfs_dev_release,
2991 };
2992
2993
2994 /*
2995   sysfs - snapshots
2996 */
2997
2998 static ssize_t rbd_snap_size_show(struct device *dev,
2999                                   struct device_attribute *attr,
3000                                   char *buf)
3001 {
3002         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3003
3004         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
3005 }
3006
3007 static ssize_t rbd_snap_id_show(struct device *dev,
3008                                 struct device_attribute *attr,
3009                                 char *buf)
3010 {
3011         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3012
3013         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
3014 }
3015
3016 static ssize_t rbd_snap_features_show(struct device *dev,
3017                                 struct device_attribute *attr,
3018                                 char *buf)
3019 {
3020         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3021
3022         return sprintf(buf, "0x%016llx\n",
3023                         (unsigned long long) snap->features);
3024 }
3025
3026 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
3027 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
3028 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
3029
3030 static struct attribute *rbd_snap_attrs[] = {
3031         &dev_attr_snap_size.attr,
3032         &dev_attr_snap_id.attr,
3033         &dev_attr_snap_features.attr,
3034         NULL,
3035 };
3036
3037 static struct attribute_group rbd_snap_attr_group = {
3038         .attrs = rbd_snap_attrs,
3039 };
3040
3041 static void rbd_snap_dev_release(struct device *dev)
3042 {
3043         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3044         kfree(snap->name);
3045         kfree(snap);
3046 }
3047
3048 static const struct attribute_group *rbd_snap_attr_groups[] = {
3049         &rbd_snap_attr_group,
3050         NULL
3051 };
3052
3053 static struct device_type rbd_snap_device_type = {
3054         .groups         = rbd_snap_attr_groups,
3055         .release        = rbd_snap_dev_release,
3056 };
3057
3058 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3059 {
3060         kref_get(&spec->kref);
3061
3062         return spec;
3063 }
3064
3065 static void rbd_spec_free(struct kref *kref);
3066 static void rbd_spec_put(struct rbd_spec *spec)
3067 {
3068         if (spec)
3069                 kref_put(&spec->kref, rbd_spec_free);
3070 }
3071
3072 static struct rbd_spec *rbd_spec_alloc(void)
3073 {
3074         struct rbd_spec *spec;
3075
3076         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3077         if (!spec)
3078                 return NULL;
3079         kref_init(&spec->kref);
3080
3081         return spec;
3082 }
3083
3084 static void rbd_spec_free(struct kref *kref)
3085 {
3086         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3087
3088         kfree(spec->pool_name);
3089         kfree(spec->image_id);
3090         kfree(spec->image_name);
3091         kfree(spec->snap_name);
3092         kfree(spec);
3093 }
3094
3095 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3096                                 struct rbd_spec *spec)
3097 {
3098         struct rbd_device *rbd_dev;
3099
3100         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3101         if (!rbd_dev)
3102                 return NULL;
3103
3104         spin_lock_init(&rbd_dev->lock);
3105         rbd_dev->flags = 0;
3106         INIT_LIST_HEAD(&rbd_dev->node);
3107         INIT_LIST_HEAD(&rbd_dev->snaps);
3108         init_rwsem(&rbd_dev->header_rwsem);
3109
3110         rbd_dev->spec = spec;
3111         rbd_dev->rbd_client = rbdc;
3112
3113         /* Initialize the layout used for all rbd requests */
3114
3115         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3116         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3117         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3118         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3119
3120         return rbd_dev;
3121 }
3122
3123 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3124 {
3125         rbd_spec_put(rbd_dev->parent_spec);
3126         kfree(rbd_dev->header_name);
3127         rbd_put_client(rbd_dev->rbd_client);
3128         rbd_spec_put(rbd_dev->spec);
3129         kfree(rbd_dev);
3130 }
3131
3132 static bool rbd_snap_registered(struct rbd_snap *snap)
3133 {
3134         bool ret = snap->dev.type == &rbd_snap_device_type;
3135         bool reg = device_is_registered(&snap->dev);
3136
3137         rbd_assert(!ret ^ reg);
3138
3139         return ret;
3140 }
3141
3142 static void rbd_remove_snap_dev(struct rbd_snap *snap)
3143 {
3144         list_del(&snap->node);
3145         if (device_is_registered(&snap->dev))
3146                 device_unregister(&snap->dev);
3147 }
3148
3149 static int rbd_register_snap_dev(struct rbd_snap *snap,
3150                                   struct device *parent)
3151 {
3152         struct device *dev = &snap->dev;
3153         int ret;
3154
3155         dev->type = &rbd_snap_device_type;
3156         dev->parent = parent;
3157         dev->release = rbd_snap_dev_release;
3158         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
3159         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
3160
3161         ret = device_register(dev);
3162
3163         return ret;
3164 }
3165
3166 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
3167                                                 const char *snap_name,
3168                                                 u64 snap_id, u64 snap_size,
3169                                                 u64 snap_features)
3170 {
3171         struct rbd_snap *snap;
3172         int ret;
3173
3174         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3175         if (!snap)
3176                 return ERR_PTR(-ENOMEM);
3177
3178         ret = -ENOMEM;
3179         snap->name = kstrdup(snap_name, GFP_KERNEL);
3180         if (!snap->name)
3181                 goto err;
3182
3183         snap->id = snap_id;
3184         snap->size = snap_size;
3185         snap->features = snap_features;
3186
3187         return snap;
3188
3189 err:
3190         kfree(snap->name);
3191         kfree(snap);
3192
3193         return ERR_PTR(ret);
3194 }
3195
3196 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3197                 u64 *snap_size, u64 *snap_features)
3198 {
3199         char *snap_name;
3200
3201         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3202
3203         *snap_size = rbd_dev->header.snap_sizes[which];
3204         *snap_features = 0;     /* No features for v1 */
3205
3206         /* Skip over names until we find the one we are looking for */
3207
3208         snap_name = rbd_dev->header.snap_names;
3209         while (which--)
3210                 snap_name += strlen(snap_name) + 1;
3211
3212         return snap_name;
3213 }
3214
3215 /*
3216  * Get the size and object order for an image snapshot, or if
3217  * snap_id is CEPH_NOSNAP, gets this information for the base
3218  * image.
3219  */
3220 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3221                                 u8 *order, u64 *snap_size)
3222 {
3223         __le64 snapid = cpu_to_le64(snap_id);
3224         int ret;
3225         struct {
3226                 u8 order;
3227                 __le64 size;
3228         } __attribute__ ((packed)) size_buf = { 0 };
3229
3230         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3231                                 "rbd", "get_size",
3232                                 (char *) &snapid, sizeof (snapid),
3233                                 (char *) &size_buf, sizeof (size_buf), NULL);
3234         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3235         if (ret < 0)
3236                 return ret;
3237
3238         *order = size_buf.order;
3239         *snap_size = le64_to_cpu(size_buf.size);
3240
3241         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3242                 (unsigned long long) snap_id, (unsigned int) *order,
3243                 (unsigned long long) *snap_size);
3244
3245         return 0;
3246 }
3247
3248 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3249 {
3250         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3251                                         &rbd_dev->header.obj_order,
3252                                         &rbd_dev->header.image_size);
3253 }
3254
3255 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3256 {
3257         void *reply_buf;
3258         int ret;
3259         void *p;
3260
3261         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3262         if (!reply_buf)
3263                 return -ENOMEM;
3264
3265         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3266                                 "rbd", "get_object_prefix",
3267                                 NULL, 0,
3268                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3269         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3270         if (ret < 0)
3271                 goto out;
3272
3273         p = reply_buf;
3274         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3275                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
3276                                                 NULL, GFP_NOIO);
3277
3278         if (IS_ERR(rbd_dev->header.object_prefix)) {
3279                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3280                 rbd_dev->header.object_prefix = NULL;
3281         } else {
3282                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3283         }
3284
3285 out:
3286         kfree(reply_buf);
3287
3288         return ret;
3289 }
3290
3291 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3292                 u64 *snap_features)
3293 {
3294         __le64 snapid = cpu_to_le64(snap_id);
3295         struct {
3296                 __le64 features;
3297                 __le64 incompat;
3298         } features_buf = { 0 };
3299         u64 incompat;
3300         int ret;
3301
3302         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3303                                 "rbd", "get_features",
3304                                 (char *) &snapid, sizeof (snapid),
3305                                 (char *) &features_buf, sizeof (features_buf),
3306                                 NULL);
3307         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3308         if (ret < 0)
3309                 return ret;
3310
3311         incompat = le64_to_cpu(features_buf.incompat);
3312         if (incompat & ~RBD_FEATURES_SUPPORTED)
3313                 return -ENXIO;
3314
3315         *snap_features = le64_to_cpu(features_buf.features);
3316
3317         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3318                 (unsigned long long) snap_id,
3319                 (unsigned long long) *snap_features,
3320                 (unsigned long long) le64_to_cpu(features_buf.incompat));
3321
3322         return 0;
3323 }
3324
3325 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3326 {
3327         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3328                                                 &rbd_dev->header.features);
3329 }
3330
3331 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3332 {
3333         struct rbd_spec *parent_spec;
3334         size_t size;
3335         void *reply_buf = NULL;
3336         __le64 snapid;
3337         void *p;
3338         void *end;
3339         char *image_id;
3340         u64 overlap;
3341         int ret;
3342
3343         parent_spec = rbd_spec_alloc();
3344         if (!parent_spec)
3345                 return -ENOMEM;
3346
3347         size = sizeof (__le64) +                                /* pool_id */
3348                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3349                 sizeof (__le64) +                               /* snap_id */
3350                 sizeof (__le64);                                /* overlap */
3351         reply_buf = kmalloc(size, GFP_KERNEL);
3352         if (!reply_buf) {
3353                 ret = -ENOMEM;
3354                 goto out_err;
3355         }
3356
3357         snapid = cpu_to_le64(CEPH_NOSNAP);
3358         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3359                                 "rbd", "get_parent",
3360                                 (char *) &snapid, sizeof (snapid),
3361                                 (char *) reply_buf, size, NULL);
3362         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3363         if (ret < 0)
3364                 goto out_err;
3365
3366         ret = -ERANGE;
3367         p = reply_buf;
3368         end = (char *) reply_buf + size;
3369         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3370         if (parent_spec->pool_id == CEPH_NOPOOL)
3371                 goto out;       /* No parent?  No problem. */
3372
3373         /* The ceph file layout needs to fit pool id in 32 bits */
3374
3375         ret = -EIO;
3376         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
3377                 goto out;
3378
3379         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3380         if (IS_ERR(image_id)) {
3381                 ret = PTR_ERR(image_id);
3382                 goto out_err;
3383         }
3384         parent_spec->image_id = image_id;
3385         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3386         ceph_decode_64_safe(&p, end, overlap, out_err);
3387
3388         rbd_dev->parent_overlap = overlap;
3389         rbd_dev->parent_spec = parent_spec;
3390         parent_spec = NULL;     /* rbd_dev now owns this */
3391 out:
3392         ret = 0;
3393 out_err:
3394         kfree(reply_buf);
3395         rbd_spec_put(parent_spec);
3396
3397         return ret;
3398 }
3399
3400 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3401 {
3402         size_t image_id_size;
3403         char *image_id;
3404         void *p;
3405         void *end;
3406         size_t size;
3407         void *reply_buf = NULL;
3408         size_t len = 0;
3409         char *image_name = NULL;
3410         int ret;
3411
3412         rbd_assert(!rbd_dev->spec->image_name);
3413
3414         len = strlen(rbd_dev->spec->image_id);
3415         image_id_size = sizeof (__le32) + len;
3416         image_id = kmalloc(image_id_size, GFP_KERNEL);
3417         if (!image_id)
3418                 return NULL;
3419
3420         p = image_id;
3421         end = (char *) image_id + image_id_size;
3422         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
3423
3424         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3425         reply_buf = kmalloc(size, GFP_KERNEL);
3426         if (!reply_buf)
3427                 goto out;
3428
3429         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3430                                 "rbd", "dir_get_name",
3431                                 image_id, image_id_size,
3432                                 (char *) reply_buf, size, NULL);
3433         if (ret < 0)
3434                 goto out;
3435         p = reply_buf;
3436         end = (char *) reply_buf + size;
3437         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3438         if (IS_ERR(image_name))
3439                 image_name = NULL;
3440         else
3441                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3442 out:
3443         kfree(reply_buf);
3444         kfree(image_id);
3445
3446         return image_name;
3447 }
3448
3449 /*
3450  * When a parent image gets probed, we only have the pool, image,
3451  * and snapshot ids but not the names of any of them.  This call
3452  * is made later to fill in those names.  It has to be done after
3453  * rbd_dev_snaps_update() has completed because some of the
3454  * information (in particular, snapshot name) is not available
3455  * until then.
3456  */
3457 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3458 {
3459         struct ceph_osd_client *osdc;
3460         const char *name;
3461         void *reply_buf = NULL;
3462         int ret;
3463
3464         if (rbd_dev->spec->pool_name)
3465                 return 0;       /* Already have the names */
3466
3467         /* Look up the pool name */
3468
3469         osdc = &rbd_dev->rbd_client->client->osdc;
3470         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3471         if (!name) {
3472                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3473                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3474                 return -EIO;
3475         }
3476
3477         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3478         if (!rbd_dev->spec->pool_name)
3479                 return -ENOMEM;
3480
3481         /* Fetch the image name; tolerate failure here */
3482
3483         name = rbd_dev_image_name(rbd_dev);
3484         if (name)
3485                 rbd_dev->spec->image_name = (char *) name;
3486         else
3487                 rbd_warn(rbd_dev, "unable to get image name");
3488
3489         /* Look up the snapshot name. */
3490
3491         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3492         if (!name) {
3493                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3494                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3495                 ret = -EIO;
3496                 goto out_err;
3497         }
3498         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3499         if(!rbd_dev->spec->snap_name)
3500                 goto out_err;
3501
3502         return 0;
3503 out_err:
3504         kfree(reply_buf);
3505         kfree(rbd_dev->spec->pool_name);
3506         rbd_dev->spec->pool_name = NULL;
3507
3508         return ret;
3509 }
3510
3511 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3512 {
3513         size_t size;
3514         int ret;
3515         void *reply_buf;
3516         void *p;
3517         void *end;
3518         u64 seq;
3519         u32 snap_count;
3520         struct ceph_snap_context *snapc;
3521         u32 i;
3522
3523         /*
3524          * We'll need room for the seq value (maximum snapshot id),
3525          * snapshot count, and array of that many snapshot ids.
3526          * For now we have a fixed upper limit on the number we're
3527          * prepared to receive.
3528          */
3529         size = sizeof (__le64) + sizeof (__le32) +
3530                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3531         reply_buf = kzalloc(size, GFP_KERNEL);
3532         if (!reply_buf)
3533                 return -ENOMEM;
3534
3535         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3536                                 "rbd", "get_snapcontext",
3537                                 NULL, 0,
3538                                 reply_buf, size, ver);
3539         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3540         if (ret < 0)
3541                 goto out;
3542
3543         ret = -ERANGE;
3544         p = reply_buf;
3545         end = (char *) reply_buf + size;
3546         ceph_decode_64_safe(&p, end, seq, out);
3547         ceph_decode_32_safe(&p, end, snap_count, out);
3548
3549         /*
3550          * Make sure the reported number of snapshot ids wouldn't go
3551          * beyond the end of our buffer.  But before checking that,
3552          * make sure the computed size of the snapshot context we
3553          * allocate is representable in a size_t.
3554          */
3555         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3556                                  / sizeof (u64)) {
3557                 ret = -EINVAL;
3558                 goto out;
3559         }
3560         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3561                 goto out;
3562
3563         size = sizeof (struct ceph_snap_context) +
3564                                 snap_count * sizeof (snapc->snaps[0]);
3565         snapc = kmalloc(size, GFP_KERNEL);
3566         if (!snapc) {
3567                 ret = -ENOMEM;
3568                 goto out;
3569         }
3570
3571         atomic_set(&snapc->nref, 1);
3572         snapc->seq = seq;
3573         snapc->num_snaps = snap_count;
3574         for (i = 0; i < snap_count; i++)
3575                 snapc->snaps[i] = ceph_decode_64(&p);
3576
3577         rbd_dev->header.snapc = snapc;
3578
3579         dout("  snap context seq = %llu, snap_count = %u\n",
3580                 (unsigned long long) seq, (unsigned int) snap_count);
3581
3582 out:
3583         kfree(reply_buf);
3584
3585         return 0;
3586 }
3587
3588 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3589 {
3590         size_t size;
3591         void *reply_buf;
3592         __le64 snap_id;
3593         int ret;
3594         void *p;
3595         void *end;
3596         char *snap_name;
3597
3598         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3599         reply_buf = kmalloc(size, GFP_KERNEL);
3600         if (!reply_buf)
3601                 return ERR_PTR(-ENOMEM);
3602
3603         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3604         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3605                                 "rbd", "get_snapshot_name",
3606                                 (char *) &snap_id, sizeof (snap_id),
3607                                 reply_buf, size, NULL);
3608         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3609         if (ret < 0)
3610                 goto out;
3611
3612         p = reply_buf;
3613         end = (char *) reply_buf + size;
3614         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3615         if (IS_ERR(snap_name)) {
3616                 ret = PTR_ERR(snap_name);
3617                 goto out;
3618         } else {
3619                 dout("  snap_id 0x%016llx snap_name = %s\n",
3620                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3621         }
3622         kfree(reply_buf);
3623
3624         return snap_name;
3625 out:
3626         kfree(reply_buf);
3627
3628         return ERR_PTR(ret);
3629 }
3630
3631 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3632                 u64 *snap_size, u64 *snap_features)
3633 {
3634         u64 snap_id;
3635         u8 order;
3636         int ret;
3637
3638         snap_id = rbd_dev->header.snapc->snaps[which];
3639         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3640         if (ret)
3641                 return ERR_PTR(ret);
3642         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3643         if (ret)
3644                 return ERR_PTR(ret);
3645
3646         return rbd_dev_v2_snap_name(rbd_dev, which);
3647 }
3648
3649 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3650                 u64 *snap_size, u64 *snap_features)
3651 {
3652         if (rbd_dev->image_format == 1)
3653                 return rbd_dev_v1_snap_info(rbd_dev, which,
3654                                         snap_size, snap_features);
3655         if (rbd_dev->image_format == 2)
3656                 return rbd_dev_v2_snap_info(rbd_dev, which,
3657                                         snap_size, snap_features);
3658         return ERR_PTR(-EINVAL);
3659 }
3660
3661 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3662 {
3663         int ret;
3664         __u8 obj_order;
3665
3666         down_write(&rbd_dev->header_rwsem);
3667
3668         /* Grab old order first, to see if it changes */
3669
3670         obj_order = rbd_dev->header.obj_order,
3671         ret = rbd_dev_v2_image_size(rbd_dev);
3672         if (ret)
3673                 goto out;
3674         if (rbd_dev->header.obj_order != obj_order) {
3675                 ret = -EIO;
3676                 goto out;
3677         }
3678         rbd_update_mapping_size(rbd_dev);
3679
3680         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3681         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3682         if (ret)
3683                 goto out;
3684         ret = rbd_dev_snaps_update(rbd_dev);
3685         dout("rbd_dev_snaps_update returned %d\n", ret);
3686         if (ret)
3687                 goto out;
3688         ret = rbd_dev_snaps_register(rbd_dev);
3689         dout("rbd_dev_snaps_register returned %d\n", ret);
3690 out:
3691         up_write(&rbd_dev->header_rwsem);
3692
3693         return ret;
3694 }
3695
3696 /*
3697  * Scan the rbd device's current snapshot list and compare it to the
3698  * newly-received snapshot context.  Remove any existing snapshots
3699  * not present in the new snapshot context.  Add a new snapshot for
3700  * any snaphots in the snapshot context not in the current list.
3701  * And verify there are no changes to snapshots we already know
3702  * about.
3703  *
3704  * Assumes the snapshots in the snapshot context are sorted by
3705  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3706  * are also maintained in that order.)
3707  */
3708 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3709 {
3710         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3711         const u32 snap_count = snapc->num_snaps;
3712         struct list_head *head = &rbd_dev->snaps;
3713         struct list_head *links = head->next;
3714         u32 index = 0;
3715
3716         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3717         while (index < snap_count || links != head) {
3718                 u64 snap_id;
3719                 struct rbd_snap *snap;
3720                 char *snap_name;
3721                 u64 snap_size = 0;
3722                 u64 snap_features = 0;
3723
3724                 snap_id = index < snap_count ? snapc->snaps[index]
3725                                              : CEPH_NOSNAP;
3726                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3727                                      : NULL;
3728                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3729
3730                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3731                         struct list_head *next = links->next;
3732
3733                         /*
3734                          * A previously-existing snapshot is not in
3735                          * the new snap context.
3736                          *
3737                          * If the now missing snapshot is the one the
3738                          * image is mapped to, clear its exists flag
3739                          * so we can avoid sending any more requests
3740                          * to it.
3741                          */
3742                         if (rbd_dev->spec->snap_id == snap->id)
3743                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3744                         rbd_remove_snap_dev(snap);
3745                         dout("%ssnap id %llu has been removed\n",
3746                                 rbd_dev->spec->snap_id == snap->id ?
3747                                                         "mapped " : "",
3748                                 (unsigned long long) snap->id);
3749
3750                         /* Done with this list entry; advance */
3751
3752                         links = next;
3753                         continue;
3754                 }
3755
3756                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3757                                         &snap_size, &snap_features);
3758                 if (IS_ERR(snap_name))
3759                         return PTR_ERR(snap_name);
3760
3761                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3762                         (unsigned long long) snap_id);
3763                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3764                         struct rbd_snap *new_snap;
3765
3766                         /* We haven't seen this snapshot before */
3767
3768                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3769                                         snap_id, snap_size, snap_features);
3770                         if (IS_ERR(new_snap)) {
3771                                 int err = PTR_ERR(new_snap);
3772
3773                                 dout("  failed to add dev, error %d\n", err);
3774
3775                                 return err;
3776                         }
3777
3778                         /* New goes before existing, or at end of list */
3779
3780                         dout("  added dev%s\n", snap ? "" : " at end\n");
3781                         if (snap)
3782                                 list_add_tail(&new_snap->node, &snap->node);
3783                         else
3784                                 list_add_tail(&new_snap->node, head);
3785                 } else {
3786                         /* Already have this one */
3787
3788                         dout("  already present\n");
3789
3790                         rbd_assert(snap->size == snap_size);
3791                         rbd_assert(!strcmp(snap->name, snap_name));
3792                         rbd_assert(snap->features == snap_features);
3793
3794                         /* Done with this list entry; advance */
3795
3796                         links = links->next;
3797                 }
3798
3799                 /* Advance to the next entry in the snapshot context */
3800
3801                 index++;
3802         }
3803         dout("%s: done\n", __func__);
3804
3805         return 0;
3806 }
3807
3808 /*
3809  * Scan the list of snapshots and register the devices for any that
3810  * have not already been registered.
3811  */
3812 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3813 {
3814         struct rbd_snap *snap;
3815         int ret = 0;
3816
3817         dout("%s:\n", __func__);
3818         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3819                 return -EIO;
3820
3821         list_for_each_entry(snap, &rbd_dev->snaps, node) {
3822                 if (!rbd_snap_registered(snap)) {
3823                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3824                         if (ret < 0)
3825                                 break;
3826                 }
3827         }
3828         dout("%s: returning %d\n", __func__, ret);
3829
3830         return ret;
3831 }
3832
3833 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3834 {
3835         struct device *dev;
3836         int ret;
3837
3838         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3839
3840         dev = &rbd_dev->dev;
3841         dev->bus = &rbd_bus_type;
3842         dev->type = &rbd_device_type;
3843         dev->parent = &rbd_root_dev;
3844         dev->release = rbd_dev_release;
3845         dev_set_name(dev, "%d", rbd_dev->dev_id);
3846         ret = device_register(dev);
3847
3848         mutex_unlock(&ctl_mutex);
3849
3850         return ret;
3851 }
3852
3853 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3854 {
3855         device_unregister(&rbd_dev->dev);
3856 }
3857
3858 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3859
3860 /*
3861  * Get a unique rbd identifier for the given new rbd_dev, and add
3862  * the rbd_dev to the global list.  The minimum rbd id is 1.
3863  */
3864 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3865 {
3866         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3867
3868         spin_lock(&rbd_dev_list_lock);
3869         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3870         spin_unlock(&rbd_dev_list_lock);
3871         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3872                 (unsigned long long) rbd_dev->dev_id);
3873 }
3874
3875 /*
3876  * Remove an rbd_dev from the global list, and record that its
3877  * identifier is no longer in use.
3878  */
3879 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3880 {
3881         struct list_head *tmp;
3882         int rbd_id = rbd_dev->dev_id;
3883         int max_id;
3884
3885         rbd_assert(rbd_id > 0);
3886
3887         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3888                 (unsigned long long) rbd_dev->dev_id);
3889         spin_lock(&rbd_dev_list_lock);
3890         list_del_init(&rbd_dev->node);
3891
3892         /*
3893          * If the id being "put" is not the current maximum, there
3894          * is nothing special we need to do.
3895          */
3896         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3897                 spin_unlock(&rbd_dev_list_lock);
3898                 return;
3899         }
3900
3901         /*
3902          * We need to update the current maximum id.  Search the
3903          * list to find out what it is.  We're more likely to find
3904          * the maximum at the end, so search the list backward.
3905          */
3906         max_id = 0;
3907         list_for_each_prev(tmp, &rbd_dev_list) {
3908                 struct rbd_device *rbd_dev;
3909
3910                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3911                 if (rbd_dev->dev_id > max_id)
3912                         max_id = rbd_dev->dev_id;
3913         }
3914         spin_unlock(&rbd_dev_list_lock);
3915
3916         /*
3917          * The max id could have been updated by rbd_dev_id_get(), in
3918          * which case it now accurately reflects the new maximum.
3919          * Be careful not to overwrite the maximum value in that
3920          * case.
3921          */
3922         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3923         dout("  max dev id has been reset\n");
3924 }
3925
3926 /*
3927  * Skips over white space at *buf, and updates *buf to point to the
3928  * first found non-space character (if any). Returns the length of
3929  * the token (string of non-white space characters) found.  Note
3930  * that *buf must be terminated with '\0'.
3931  */
3932 static inline size_t next_token(const char **buf)
3933 {
3934         /*
3935         * These are the characters that produce nonzero for
3936         * isspace() in the "C" and "POSIX" locales.
3937         */
3938         const char *spaces = " \f\n\r\t\v";
3939
3940         *buf += strspn(*buf, spaces);   /* Find start of token */
3941
3942         return strcspn(*buf, spaces);   /* Return token length */
3943 }
3944
3945 /*
3946  * Finds the next token in *buf, and if the provided token buffer is
3947  * big enough, copies the found token into it.  The result, if
3948  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3949  * must be terminated with '\0' on entry.
3950  *
3951  * Returns the length of the token found (not including the '\0').
3952  * Return value will be 0 if no token is found, and it will be >=
3953  * token_size if the token would not fit.
3954  *
3955  * The *buf pointer will be updated to point beyond the end of the
3956  * found token.  Note that this occurs even if the token buffer is
3957  * too small to hold it.
3958  */
3959 static inline size_t copy_token(const char **buf,
3960                                 char *token,
3961                                 size_t token_size)
3962 {
3963         size_t len;
3964
3965         len = next_token(buf);
3966         if (len < token_size) {
3967                 memcpy(token, *buf, len);
3968                 *(token + len) = '\0';
3969         }
3970         *buf += len;
3971
3972         return len;
3973 }
3974
3975 /*
3976  * Finds the next token in *buf, dynamically allocates a buffer big
3977  * enough to hold a copy of it, and copies the token into the new
3978  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3979  * that a duplicate buffer is created even for a zero-length token.
3980  *
3981  * Returns a pointer to the newly-allocated duplicate, or a null
3982  * pointer if memory for the duplicate was not available.  If
3983  * the lenp argument is a non-null pointer, the length of the token
3984  * (not including the '\0') is returned in *lenp.
3985  *
3986  * If successful, the *buf pointer will be updated to point beyond
3987  * the end of the found token.
3988  *
3989  * Note: uses GFP_KERNEL for allocation.
3990  */
3991 static inline char *dup_token(const char **buf, size_t *lenp)
3992 {
3993         char *dup;
3994         size_t len;
3995
3996         len = next_token(buf);
3997         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3998         if (!dup)
3999                 return NULL;
4000         *(dup + len) = '\0';
4001         *buf += len;
4002
4003         if (lenp)
4004                 *lenp = len;
4005
4006         return dup;
4007 }
4008
4009 /*
4010  * Parse the options provided for an "rbd add" (i.e., rbd image
4011  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4012  * and the data written is passed here via a NUL-terminated buffer.
4013  * Returns 0 if successful or an error code otherwise.
4014  *
4015  * The information extracted from these options is recorded in
4016  * the other parameters which return dynamically-allocated
4017  * structures:
4018  *  ceph_opts
4019  *      The address of a pointer that will refer to a ceph options
4020  *      structure.  Caller must release the returned pointer using
4021  *      ceph_destroy_options() when it is no longer needed.
4022  *  rbd_opts
4023  *      Address of an rbd options pointer.  Fully initialized by
4024  *      this function; caller must release with kfree().
4025  *  spec
4026  *      Address of an rbd image specification pointer.  Fully
4027  *      initialized by this function based on parsed options.
4028  *      Caller must release with rbd_spec_put().
4029  *
4030  * The options passed take this form:
4031  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4032  * where:
4033  *  <mon_addrs>
4034  *      A comma-separated list of one or more monitor addresses.
4035  *      A monitor address is an ip address, optionally followed
4036  *      by a port number (separated by a colon).
4037  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4038  *  <options>
4039  *      A comma-separated list of ceph and/or rbd options.
4040  *  <pool_name>
4041  *      The name of the rados pool containing the rbd image.
4042  *  <image_name>
4043  *      The name of the image in that pool to map.
4044  *  <snap_id>
4045  *      An optional snapshot id.  If provided, the mapping will
4046  *      present data from the image at the time that snapshot was
4047  *      created.  The image head is used if no snapshot id is
4048  *      provided.  Snapshot mappings are always read-only.
4049  */
4050 static int rbd_add_parse_args(const char *buf,
4051                                 struct ceph_options **ceph_opts,
4052                                 struct rbd_options **opts,
4053                                 struct rbd_spec **rbd_spec)
4054 {
4055         size_t len;
4056         char *options;
4057         const char *mon_addrs;
4058         size_t mon_addrs_size;
4059         struct rbd_spec *spec = NULL;
4060         struct rbd_options *rbd_opts = NULL;
4061         struct ceph_options *copts;
4062         int ret;
4063
4064         /* The first four tokens are required */
4065
4066         len = next_token(&buf);
4067         if (!len) {
4068                 rbd_warn(NULL, "no monitor address(es) provided");
4069                 return -EINVAL;
4070         }
4071         mon_addrs = buf;
4072         mon_addrs_size = len + 1;
4073         buf += len;
4074
4075         ret = -EINVAL;
4076         options = dup_token(&buf, NULL);
4077         if (!options)
4078                 return -ENOMEM;
4079         if (!*options) {
4080                 rbd_warn(NULL, "no options provided");
4081                 goto out_err;
4082         }
4083
4084         spec = rbd_spec_alloc();
4085         if (!spec)
4086                 goto out_mem;
4087
4088         spec->pool_name = dup_token(&buf, NULL);
4089         if (!spec->pool_name)
4090                 goto out_mem;
4091         if (!*spec->pool_name) {
4092                 rbd_warn(NULL, "no pool name provided");
4093                 goto out_err;
4094         }
4095
4096         spec->image_name = dup_token(&buf, NULL);
4097         if (!spec->image_name)
4098                 goto out_mem;
4099         if (!*spec->image_name) {
4100                 rbd_warn(NULL, "no image name provided");
4101                 goto out_err;
4102         }
4103
4104         /*
4105          * Snapshot name is optional; default is to use "-"
4106          * (indicating the head/no snapshot).
4107          */
4108         len = next_token(&buf);
4109         if (!len) {
4110                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4111                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4112         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4113                 ret = -ENAMETOOLONG;
4114                 goto out_err;
4115         }
4116         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4117         if (!spec->snap_name)
4118                 goto out_mem;
4119         *(spec->snap_name + len) = '\0';
4120
4121         /* Initialize all rbd options to the defaults */
4122
4123         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4124         if (!rbd_opts)
4125                 goto out_mem;
4126
4127         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4128
4129         copts = ceph_parse_options(options, mon_addrs,
4130                                         mon_addrs + mon_addrs_size - 1,
4131                                         parse_rbd_opts_token, rbd_opts);
4132         if (IS_ERR(copts)) {
4133                 ret = PTR_ERR(copts);
4134                 goto out_err;
4135         }
4136         kfree(options);
4137
4138         *ceph_opts = copts;
4139         *opts = rbd_opts;
4140         *rbd_spec = spec;
4141
4142         return 0;
4143 out_mem:
4144         ret = -ENOMEM;
4145 out_err:
4146         kfree(rbd_opts);
4147         rbd_spec_put(spec);
4148         kfree(options);
4149
4150         return ret;
4151 }
4152
4153 /*
4154  * An rbd format 2 image has a unique identifier, distinct from the
4155  * name given to it by the user.  Internally, that identifier is
4156  * what's used to specify the names of objects related to the image.
4157  *
4158  * A special "rbd id" object is used to map an rbd image name to its
4159  * id.  If that object doesn't exist, then there is no v2 rbd image
4160  * with the supplied name.
4161  *
4162  * This function will record the given rbd_dev's image_id field if
4163  * it can be determined, and in that case will return 0.  If any
4164  * errors occur a negative errno will be returned and the rbd_dev's
4165  * image_id field will be unchanged (and should be NULL).
4166  */
4167 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4168 {
4169         int ret;
4170         size_t size;
4171         char *object_name;
4172         void *response;
4173         void *p;
4174
4175         /* If we already have it we don't need to look it up */
4176
4177         if (rbd_dev->spec->image_id)
4178                 return 0;
4179
4180         /*
4181          * When probing a parent image, the image id is already
4182          * known (and the image name likely is not).  There's no
4183          * need to fetch the image id again in this case.
4184          */
4185         if (rbd_dev->spec->image_id)
4186                 return 0;
4187
4188         /*
4189          * First, see if the format 2 image id file exists, and if
4190          * so, get the image's persistent id from it.
4191          */
4192         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4193         object_name = kmalloc(size, GFP_NOIO);
4194         if (!object_name)
4195                 return -ENOMEM;
4196         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4197         dout("rbd id object name is %s\n", object_name);
4198
4199         /* Response will be an encoded string, which includes a length */
4200
4201         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4202         response = kzalloc(size, GFP_NOIO);
4203         if (!response) {
4204                 ret = -ENOMEM;
4205                 goto out;
4206         }
4207
4208         ret = rbd_obj_method_sync(rbd_dev, object_name,
4209                                 "rbd", "get_id",
4210                                 NULL, 0,
4211                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4212         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4213         if (ret < 0)
4214                 goto out;
4215
4216         p = response;
4217         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
4218                                                 p + RBD_IMAGE_ID_LEN_MAX,
4219                                                 NULL, GFP_NOIO);
4220         if (IS_ERR(rbd_dev->spec->image_id)) {
4221                 ret = PTR_ERR(rbd_dev->spec->image_id);
4222                 rbd_dev->spec->image_id = NULL;
4223         } else {
4224                 dout("image_id is %s\n", rbd_dev->spec->image_id);
4225         }
4226 out:
4227         kfree(response);
4228         kfree(object_name);
4229
4230         return ret;
4231 }
4232
4233 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4234 {
4235         int ret;
4236         size_t size;
4237
4238         /* Version 1 images have no id; empty string is used */
4239
4240         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
4241         if (!rbd_dev->spec->image_id)
4242                 return -ENOMEM;
4243
4244         /* Record the header object name for this rbd image. */
4245
4246         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
4247         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4248         if (!rbd_dev->header_name) {
4249                 ret = -ENOMEM;
4250                 goto out_err;
4251         }
4252         sprintf(rbd_dev->header_name, "%s%s",
4253                 rbd_dev->spec->image_name, RBD_SUFFIX);
4254
4255         /* Populate rbd image metadata */
4256
4257         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4258         if (ret < 0)
4259                 goto out_err;
4260
4261         /* Version 1 images have no parent (no layering) */
4262
4263         rbd_dev->parent_spec = NULL;
4264         rbd_dev->parent_overlap = 0;
4265
4266         rbd_dev->image_format = 1;
4267
4268         dout("discovered version 1 image, header name is %s\n",
4269                 rbd_dev->header_name);
4270
4271         return 0;
4272
4273 out_err:
4274         kfree(rbd_dev->header_name);
4275         rbd_dev->header_name = NULL;
4276         kfree(rbd_dev->spec->image_id);
4277         rbd_dev->spec->image_id = NULL;
4278
4279         return ret;
4280 }
4281
4282 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4283 {
4284         size_t size;
4285         int ret;
4286         u64 ver = 0;
4287
4288         /*
4289          * Image id was filled in by the caller.  Record the header
4290          * object name for this rbd image.
4291          */
4292         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4293         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4294         if (!rbd_dev->header_name)
4295                 return -ENOMEM;
4296         sprintf(rbd_dev->header_name, "%s%s",
4297                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4298
4299         /* Get the size and object order for the image */
4300
4301         ret = rbd_dev_v2_image_size(rbd_dev);
4302         if (ret < 0)
4303                 goto out_err;
4304
4305         /* Get the object prefix (a.k.a. block_name) for the image */
4306
4307         ret = rbd_dev_v2_object_prefix(rbd_dev);
4308         if (ret < 0)
4309                 goto out_err;
4310
4311         /* Get the and check features for the image */
4312
4313         ret = rbd_dev_v2_features(rbd_dev);
4314         if (ret < 0)
4315                 goto out_err;
4316
4317         /* If the image supports layering, get the parent info */
4318
4319         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4320                 ret = rbd_dev_v2_parent_info(rbd_dev);
4321                 if (ret < 0)
4322                         goto out_err;
4323         }
4324
4325         /* crypto and compression type aren't (yet) supported for v2 images */
4326
4327         rbd_dev->header.crypt_type = 0;
4328         rbd_dev->header.comp_type = 0;
4329
4330         /* Get the snapshot context, plus the header version */
4331
4332         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4333         if (ret)
4334                 goto out_err;
4335         rbd_dev->header.obj_version = ver;
4336
4337         rbd_dev->image_format = 2;
4338
4339         dout("discovered version 2 image, header name is %s\n",
4340                 rbd_dev->header_name);
4341
4342         return 0;
4343 out_err:
4344         rbd_dev->parent_overlap = 0;
4345         rbd_spec_put(rbd_dev->parent_spec);
4346         rbd_dev->parent_spec = NULL;
4347         kfree(rbd_dev->header_name);
4348         rbd_dev->header_name = NULL;
4349         kfree(rbd_dev->header.object_prefix);
4350         rbd_dev->header.object_prefix = NULL;
4351
4352         return ret;
4353 }
4354
4355 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4356 {
4357         struct rbd_device *parent = NULL;
4358         struct rbd_spec *parent_spec = NULL;
4359         struct rbd_client *rbdc = NULL;
4360         int ret;
4361
4362         /* no need to lock here, as rbd_dev is not registered yet */
4363         ret = rbd_dev_snaps_update(rbd_dev);
4364         if (ret)
4365                 return ret;
4366
4367         ret = rbd_dev_probe_update_spec(rbd_dev);
4368         if (ret)
4369                 goto err_out_snaps;
4370
4371         ret = rbd_dev_set_mapping(rbd_dev);
4372         if (ret)
4373                 goto err_out_snaps;
4374
4375         /* generate unique id: find highest unique id, add one */
4376         rbd_dev_id_get(rbd_dev);
4377
4378         /* Fill in the device name, now that we have its id. */
4379         BUILD_BUG_ON(DEV_NAME_LEN
4380                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4381         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4382
4383         /* Get our block major device number. */
4384
4385         ret = register_blkdev(0, rbd_dev->name);
4386         if (ret < 0)
4387                 goto err_out_id;
4388         rbd_dev->major = ret;
4389
4390         /* Set up the blkdev mapping. */
4391
4392         ret = rbd_init_disk(rbd_dev);
4393         if (ret)
4394                 goto err_out_blkdev;
4395
4396         ret = rbd_bus_add_dev(rbd_dev);
4397         if (ret)
4398                 goto err_out_disk;
4399
4400         /*
4401          * At this point cleanup in the event of an error is the job
4402          * of the sysfs code (initiated by rbd_bus_del_dev()).
4403          */
4404         /* Probe the parent if there is one */
4405
4406         if (rbd_dev->parent_spec) {
4407                 /*
4408                  * We need to pass a reference to the client and the
4409                  * parent spec when creating the parent rbd_dev.
4410                  * Images related by parent/child relationships
4411                  * always share both.
4412                  */
4413                 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4414                 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4415
4416                 parent = rbd_dev_create(rbdc, parent_spec);
4417                 if (!parent) {
4418                         ret = -ENOMEM;
4419                         goto err_out_spec;
4420                 }
4421                 rbdc = NULL;            /* parent now owns reference */
4422                 parent_spec = NULL;     /* parent now owns reference */
4423                 ret = rbd_dev_probe(parent);
4424                 if (ret < 0)
4425                         goto err_out_parent;
4426                 rbd_dev->parent = parent;
4427         }
4428
4429         down_write(&rbd_dev->header_rwsem);
4430         ret = rbd_dev_snaps_register(rbd_dev);
4431         up_write(&rbd_dev->header_rwsem);
4432         if (ret)
4433                 goto err_out_bus;
4434
4435         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4436         if (ret)
4437                 goto err_out_bus;
4438
4439         /* Everything's ready.  Announce the disk to the world. */
4440
4441         add_disk(rbd_dev->disk);
4442
4443         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4444                 (unsigned long long) rbd_dev->mapping.size);
4445
4446         return ret;
4447
4448 err_out_parent:
4449         rbd_dev_destroy(parent);
4450 err_out_spec:
4451         rbd_spec_put(parent_spec);
4452         rbd_put_client(rbdc);
4453 err_out_bus:
4454         /* this will also clean up rest of rbd_dev stuff */
4455
4456         rbd_bus_del_dev(rbd_dev);
4457
4458         return ret;
4459 err_out_disk:
4460         rbd_free_disk(rbd_dev);
4461 err_out_blkdev:
4462         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4463 err_out_id:
4464         rbd_dev_id_put(rbd_dev);
4465 err_out_snaps:
4466         rbd_remove_all_snaps(rbd_dev);
4467
4468         return ret;
4469 }
4470
4471 /*
4472  * Probe for the existence of the header object for the given rbd
4473  * device.  For format 2 images this includes determining the image
4474  * id.
4475  */
4476 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4477 {
4478         int ret;
4479
4480         /*
4481          * Get the id from the image id object.  If it's not a
4482          * format 2 image, we'll get ENOENT back, and we'll assume
4483          * it's a format 1 image.
4484          */
4485         ret = rbd_dev_image_id(rbd_dev);
4486         if (ret)
4487                 ret = rbd_dev_v1_probe(rbd_dev);
4488         else
4489                 ret = rbd_dev_v2_probe(rbd_dev);
4490         if (ret) {
4491                 dout("probe failed, returning %d\n", ret);
4492
4493                 return ret;
4494         }
4495
4496         ret = rbd_dev_probe_finish(rbd_dev);
4497         if (ret)
4498                 rbd_header_free(&rbd_dev->header);
4499
4500         return ret;
4501 }
4502
4503 static ssize_t rbd_add(struct bus_type *bus,
4504                        const char *buf,
4505                        size_t count)
4506 {
4507         struct rbd_device *rbd_dev = NULL;
4508         struct ceph_options *ceph_opts = NULL;
4509         struct rbd_options *rbd_opts = NULL;
4510         struct rbd_spec *spec = NULL;
4511         struct rbd_client *rbdc;
4512         struct ceph_osd_client *osdc;
4513         int rc = -ENOMEM;
4514
4515         if (!try_module_get(THIS_MODULE))
4516                 return -ENODEV;
4517
4518         /* parse add command */
4519         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4520         if (rc < 0)
4521                 goto err_out_module;
4522
4523         rbdc = rbd_get_client(ceph_opts);
4524         if (IS_ERR(rbdc)) {
4525                 rc = PTR_ERR(rbdc);
4526                 goto err_out_args;
4527         }
4528         ceph_opts = NULL;       /* rbd_dev client now owns this */
4529
4530         /* pick the pool */
4531         osdc = &rbdc->client->osdc;
4532         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4533         if (rc < 0)
4534                 goto err_out_client;
4535         spec->pool_id = (u64) rc;
4536
4537         /* The ceph file layout needs to fit pool id in 32 bits */
4538
4539         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4540                 rc = -EIO;
4541                 goto err_out_client;
4542         }
4543
4544         rbd_dev = rbd_dev_create(rbdc, spec);
4545         if (!rbd_dev)
4546                 goto err_out_client;
4547         rbdc = NULL;            /* rbd_dev now owns this */
4548         spec = NULL;            /* rbd_dev now owns this */
4549
4550         rbd_dev->mapping.read_only = rbd_opts->read_only;
4551         kfree(rbd_opts);
4552         rbd_opts = NULL;        /* done with this */
4553
4554         rc = rbd_dev_probe(rbd_dev);
4555         if (rc < 0)
4556                 goto err_out_rbd_dev;
4557
4558         return count;
4559 err_out_rbd_dev:
4560         rbd_dev_destroy(rbd_dev);
4561 err_out_client:
4562         rbd_put_client(rbdc);
4563 err_out_args:
4564         if (ceph_opts)
4565                 ceph_destroy_options(ceph_opts);
4566         kfree(rbd_opts);
4567         rbd_spec_put(spec);
4568 err_out_module:
4569         module_put(THIS_MODULE);
4570
4571         dout("Error adding device %s\n", buf);
4572
4573         return (ssize_t) rc;
4574 }
4575
4576 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4577 {
4578         struct list_head *tmp;
4579         struct rbd_device *rbd_dev;
4580
4581         spin_lock(&rbd_dev_list_lock);
4582         list_for_each(tmp, &rbd_dev_list) {
4583                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4584                 if (rbd_dev->dev_id == dev_id) {
4585                         spin_unlock(&rbd_dev_list_lock);
4586                         return rbd_dev;
4587                 }
4588         }
4589         spin_unlock(&rbd_dev_list_lock);
4590         return NULL;
4591 }
4592
4593 static void rbd_dev_release(struct device *dev)
4594 {
4595         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4596
4597         if (rbd_dev->watch_event)
4598                 rbd_dev_header_watch_sync(rbd_dev, 0);
4599
4600         /* clean up and free blkdev */
4601         rbd_free_disk(rbd_dev);
4602         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4603
4604         /* release allocated disk header fields */
4605         rbd_header_free(&rbd_dev->header);
4606
4607         /* done with the id, and with the rbd_dev */
4608         rbd_dev_id_put(rbd_dev);
4609         rbd_assert(rbd_dev->rbd_client != NULL);
4610         rbd_dev_destroy(rbd_dev);
4611
4612         /* release module ref */
4613         module_put(THIS_MODULE);
4614 }
4615
4616 static void __rbd_remove(struct rbd_device *rbd_dev)
4617 {
4618         rbd_remove_all_snaps(rbd_dev);
4619         rbd_bus_del_dev(rbd_dev);
4620 }
4621
4622 static ssize_t rbd_remove(struct bus_type *bus,
4623                           const char *buf,
4624                           size_t count)
4625 {
4626         struct rbd_device *rbd_dev = NULL;
4627         int target_id, rc;
4628         unsigned long ul;
4629         int ret = count;
4630
4631         rc = strict_strtoul(buf, 10, &ul);
4632         if (rc)
4633                 return rc;
4634
4635         /* convert to int; abort if we lost anything in the conversion */
4636         target_id = (int) ul;
4637         if (target_id != ul)
4638                 return -EINVAL;
4639
4640         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4641
4642         rbd_dev = __rbd_get_dev(target_id);
4643         if (!rbd_dev) {
4644                 ret = -ENOENT;
4645                 goto done;
4646         }
4647
4648         spin_lock_irq(&rbd_dev->lock);
4649         if (rbd_dev->open_count)
4650                 ret = -EBUSY;
4651         else
4652                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4653         spin_unlock_irq(&rbd_dev->lock);
4654         if (ret < 0)
4655                 goto done;
4656
4657         while (rbd_dev->parent_spec) {
4658                 struct rbd_device *first = rbd_dev;
4659                 struct rbd_device *second = first->parent;
4660                 struct rbd_device *third;
4661
4662                 /*
4663                  * Follow to the parent with no grandparent and
4664                  * remove it.
4665                  */
4666                 while (second && (third = second->parent)) {
4667                         first = second;
4668                         second = third;
4669                 }
4670                 __rbd_remove(second);
4671                 rbd_spec_put(first->parent_spec);
4672                 first->parent_spec = NULL;
4673                 first->parent_overlap = 0;
4674                 first->parent = NULL;
4675         }
4676         __rbd_remove(rbd_dev);
4677
4678 done:
4679         mutex_unlock(&ctl_mutex);
4680
4681         return ret;
4682 }
4683
4684 /*
4685  * create control files in sysfs
4686  * /sys/bus/rbd/...
4687  */
4688 static int rbd_sysfs_init(void)
4689 {
4690         int ret;
4691
4692         ret = device_register(&rbd_root_dev);
4693         if (ret < 0)
4694                 return ret;
4695
4696         ret = bus_register(&rbd_bus_type);
4697         if (ret < 0)
4698                 device_unregister(&rbd_root_dev);
4699
4700         return ret;
4701 }
4702
4703 static void rbd_sysfs_cleanup(void)
4704 {
4705         bus_unregister(&rbd_bus_type);
4706         device_unregister(&rbd_root_dev);
4707 }
4708
4709 static int __init rbd_init(void)
4710 {
4711         int rc;
4712
4713         if (!libceph_compatible(NULL)) {
4714                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4715
4716                 return -EINVAL;
4717         }
4718         rc = rbd_sysfs_init();
4719         if (rc)
4720                 return rc;
4721         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4722         return 0;
4723 }
4724
4725 static void __exit rbd_exit(void)
4726 {
4727         rbd_sysfs_cleanup();
4728 }
4729
4730 module_init(rbd_init);
4731 module_exit(rbd_exit);
4732
4733 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4734 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4735 MODULE_DESCRIPTION("rados block device");
4736
4737 /* following authorship retained from original osdblk.c */
4738 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4739
4740 MODULE_LICENSE("GPL");