rbd: new request tracking code
[firefly-linux-kernel-4.4.55.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have these defined elsewhere */
56
57 #define U8_MAX  ((u8)   (~0U))
58 #define U16_MAX ((u16)  (~0U))
59 #define U32_MAX ((u32)  (~0U))
60 #define U64_MAX ((u64)  (~0ULL))
61
62 #define RBD_DRV_NAME "rbd"
63 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
64
65 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
66
67 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
68 #define RBD_MAX_SNAP_NAME_LEN   \
69                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
70
71 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
72
73 #define RBD_SNAP_HEAD_NAME      "-"
74
75 /* This allows a single page to hold an image name sent by OSD */
76 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
77 #define RBD_IMAGE_ID_LEN_MAX    64
78
79 #define RBD_OBJ_PREFIX_LEN_MAX  64
80
81 /* Feature bits */
82
83 #define RBD_FEATURE_LAYERING      1
84
85 /* Features supported by this (client software) implementation. */
86
87 #define RBD_FEATURES_ALL          (0)
88
89 /*
90  * An RBD device name will be "rbd#", where the "rbd" comes from
91  * RBD_DRV_NAME above, and # is a unique integer identifier.
92  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93  * enough to hold all possible device names.
94  */
95 #define DEV_NAME_LEN            32
96 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
97
98 /*
99  * block device image metadata (in-memory version)
100  */
101 struct rbd_image_header {
102         /* These four fields never change for a given rbd image */
103         char *object_prefix;
104         u64 features;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108
109         /* The remaining fields need to be updated occasionally */
110         u64 image_size;
111         struct ceph_snap_context *snapc;
112         char *snap_names;
113         u64 *snap_sizes;
114
115         u64 obj_version;
116 };
117
118 /*
119  * An rbd image specification.
120  *
121  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122  * identify an image.  Each rbd_dev structure includes a pointer to
123  * an rbd_spec structure that encapsulates this identity.
124  *
125  * Each of the id's in an rbd_spec has an associated name.  For a
126  * user-mapped image, the names are supplied and the id's associated
127  * with them are looked up.  For a layered image, a parent image is
128  * defined by the tuple, and the names are looked up.
129  *
130  * An rbd_dev structure contains a parent_spec pointer which is
131  * non-null if the image it represents is a child in a layered
132  * image.  This pointer will refer to the rbd_spec structure used
133  * by the parent rbd_dev for its own identity (i.e., the structure
134  * is shared between the parent and child).
135  *
136  * Since these structures are populated once, during the discovery
137  * phase of image construction, they are effectively immutable so
138  * we make no effort to synchronize access to them.
139  *
140  * Note that code herein does not assume the image name is known (it
141  * could be a null pointer).
142  */
143 struct rbd_spec {
144         u64             pool_id;
145         char            *pool_name;
146
147         char            *image_id;
148         char            *image_name;
149
150         u64             snap_id;
151         char            *snap_name;
152
153         struct kref     kref;
154 };
155
156 /*
157  * an instance of the client.  multiple devices may share an rbd client.
158  */
159 struct rbd_client {
160         struct ceph_client      *client;
161         struct kref             kref;
162         struct list_head        node;
163 };
164
165 /*
166  * a request completion status
167  */
168 struct rbd_req_status {
169         int done;
170         s32 rc;
171         u64 bytes;
172 };
173
174 /*
175  * a collection of requests
176  */
177 struct rbd_req_coll {
178         int                     total;
179         int                     num_done;
180         struct kref             kref;
181         struct rbd_req_status   status[0];
182 };
183
184 struct rbd_img_request;
185 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
186
187 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
188
189 struct rbd_obj_request;
190 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
191
192 enum obj_request_type { OBJ_REQUEST_BIO };      /* More types to come */
193
194 struct rbd_obj_request {
195         const char              *object_name;
196         u64                     offset;         /* object start byte */
197         u64                     length;         /* bytes from offset */
198
199         struct rbd_img_request  *img_request;
200         struct list_head        links;          /* img_request->obj_requests */
201         u32                     which;          /* posn image request list */
202
203         enum obj_request_type   type;
204         struct bio              *bio_list;
205
206         struct ceph_osd_request *osd_req;
207
208         u64                     xferred;        /* bytes transferred */
209         u64                     version;
210         s32                     result;
211         atomic_t                done;
212
213         rbd_obj_callback_t      callback;
214
215         struct kref             kref;
216 };
217
218 struct rbd_img_request {
219         struct request          *rq;
220         struct rbd_device       *rbd_dev;
221         u64                     offset; /* starting image byte offset */
222         u64                     length; /* byte count from offset */
223         bool                    write_request;  /* false for read */
224         union {
225                 struct ceph_snap_context *snapc;        /* for writes */
226                 u64             snap_id;                /* for reads */
227         };
228         spinlock_t              completion_lock;/* protects next_completion */
229         u32                     next_completion;
230         rbd_img_callback_t      callback;
231
232         u32                     obj_request_count;
233         struct list_head        obj_requests;   /* rbd_obj_request structs */
234
235         struct kref             kref;
236 };
237
238 #define for_each_obj_request(ireq, oreq) \
239         list_for_each_entry(oreq, &ireq->obj_requests, links)
240 #define for_each_obj_request_from(ireq, oreq) \
241         list_for_each_entry_from(oreq, &ireq->obj_requests, links)
242 #define for_each_obj_request_safe(ireq, oreq, n) \
243         list_for_each_entry_safe_reverse(oreq, n, &ireq->obj_requests, links)
244
245 /*
246  * a single io request
247  */
248 struct rbd_request {
249         struct request          *rq;            /* blk layer request */
250         struct bio              *bio;           /* cloned bio */
251         struct page             **pages;        /* list of used pages */
252         u64                     len;
253         int                     coll_index;
254         struct rbd_req_coll     *coll;
255 };
256
257 struct rbd_snap {
258         struct  device          dev;
259         const char              *name;
260         u64                     size;
261         struct list_head        node;
262         u64                     id;
263         u64                     features;
264 };
265
266 struct rbd_mapping {
267         u64                     size;
268         u64                     features;
269         bool                    read_only;
270 };
271
272 /*
273  * a single device
274  */
275 struct rbd_device {
276         int                     dev_id;         /* blkdev unique id */
277
278         int                     major;          /* blkdev assigned major */
279         struct gendisk          *disk;          /* blkdev's gendisk and rq */
280
281         u32                     image_format;   /* Either 1 or 2 */
282         struct rbd_client       *rbd_client;
283
284         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
285
286         spinlock_t              lock;           /* queue lock */
287
288         struct rbd_image_header header;
289         atomic_t                exists;
290         struct rbd_spec         *spec;
291
292         char                    *header_name;
293
294         struct ceph_file_layout layout;
295
296         struct ceph_osd_event   *watch_event;
297         struct ceph_osd_request *watch_request;
298
299         struct rbd_spec         *parent_spec;
300         u64                     parent_overlap;
301
302         /* protects updating the header */
303         struct rw_semaphore     header_rwsem;
304
305         struct rbd_mapping      mapping;
306
307         struct list_head        node;
308
309         /* list of snapshots */
310         struct list_head        snaps;
311
312         /* sysfs related */
313         struct device           dev;
314         unsigned long           open_count;
315 };
316
317 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
318
319 static LIST_HEAD(rbd_dev_list);    /* devices */
320 static DEFINE_SPINLOCK(rbd_dev_list_lock);
321
322 static LIST_HEAD(rbd_client_list);              /* clients */
323 static DEFINE_SPINLOCK(rbd_client_list_lock);
324
325 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
326 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
327
328 static void rbd_dev_release(struct device *dev);
329 static void rbd_remove_snap_dev(struct rbd_snap *snap);
330
331 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
332                        size_t count);
333 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
334                           size_t count);
335
336 static struct bus_attribute rbd_bus_attrs[] = {
337         __ATTR(add, S_IWUSR, NULL, rbd_add),
338         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
339         __ATTR_NULL
340 };
341
342 static struct bus_type rbd_bus_type = {
343         .name           = "rbd",
344         .bus_attrs      = rbd_bus_attrs,
345 };
346
347 static void rbd_root_dev_release(struct device *dev)
348 {
349 }
350
351 static struct device rbd_root_dev = {
352         .init_name =    "rbd",
353         .release =      rbd_root_dev_release,
354 };
355
356 static __printf(2, 3)
357 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
358 {
359         struct va_format vaf;
360         va_list args;
361
362         va_start(args, fmt);
363         vaf.fmt = fmt;
364         vaf.va = &args;
365
366         if (!rbd_dev)
367                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
368         else if (rbd_dev->disk)
369                 printk(KERN_WARNING "%s: %s: %pV\n",
370                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
371         else if (rbd_dev->spec && rbd_dev->spec->image_name)
372                 printk(KERN_WARNING "%s: image %s: %pV\n",
373                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
374         else if (rbd_dev->spec && rbd_dev->spec->image_id)
375                 printk(KERN_WARNING "%s: id %s: %pV\n",
376                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
377         else    /* punt */
378                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
379                         RBD_DRV_NAME, rbd_dev, &vaf);
380         va_end(args);
381 }
382
383 #ifdef RBD_DEBUG
384 #define rbd_assert(expr)                                                \
385                 if (unlikely(!(expr))) {                                \
386                         printk(KERN_ERR "\nAssertion failure in %s() "  \
387                                                 "at line %d:\n\n"       \
388                                         "\trbd_assert(%s);\n\n",        \
389                                         __func__, __LINE__, #expr);     \
390                         BUG();                                          \
391                 }
392 #else /* !RBD_DEBUG */
393 #  define rbd_assert(expr)      ((void) 0)
394 #endif /* !RBD_DEBUG */
395
396 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
397 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
398
399 static int rbd_open(struct block_device *bdev, fmode_t mode)
400 {
401         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
402
403         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
404                 return -EROFS;
405
406         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
407         (void) get_device(&rbd_dev->dev);
408         set_device_ro(bdev, rbd_dev->mapping.read_only);
409         rbd_dev->open_count++;
410         mutex_unlock(&ctl_mutex);
411
412         return 0;
413 }
414
415 static int rbd_release(struct gendisk *disk, fmode_t mode)
416 {
417         struct rbd_device *rbd_dev = disk->private_data;
418
419         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
420         rbd_assert(rbd_dev->open_count > 0);
421         rbd_dev->open_count--;
422         put_device(&rbd_dev->dev);
423         mutex_unlock(&ctl_mutex);
424
425         return 0;
426 }
427
428 static const struct block_device_operations rbd_bd_ops = {
429         .owner                  = THIS_MODULE,
430         .open                   = rbd_open,
431         .release                = rbd_release,
432 };
433
434 /*
435  * Initialize an rbd client instance.
436  * We own *ceph_opts.
437  */
438 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
439 {
440         struct rbd_client *rbdc;
441         int ret = -ENOMEM;
442
443         dout("rbd_client_create\n");
444         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
445         if (!rbdc)
446                 goto out_opt;
447
448         kref_init(&rbdc->kref);
449         INIT_LIST_HEAD(&rbdc->node);
450
451         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
452
453         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
454         if (IS_ERR(rbdc->client))
455                 goto out_mutex;
456         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
457
458         ret = ceph_open_session(rbdc->client);
459         if (ret < 0)
460                 goto out_err;
461
462         spin_lock(&rbd_client_list_lock);
463         list_add_tail(&rbdc->node, &rbd_client_list);
464         spin_unlock(&rbd_client_list_lock);
465
466         mutex_unlock(&ctl_mutex);
467
468         dout("rbd_client_create created %p\n", rbdc);
469         return rbdc;
470
471 out_err:
472         ceph_destroy_client(rbdc->client);
473 out_mutex:
474         mutex_unlock(&ctl_mutex);
475         kfree(rbdc);
476 out_opt:
477         if (ceph_opts)
478                 ceph_destroy_options(ceph_opts);
479         return ERR_PTR(ret);
480 }
481
482 /*
483  * Find a ceph client with specific addr and configuration.  If
484  * found, bump its reference count.
485  */
486 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
487 {
488         struct rbd_client *client_node;
489         bool found = false;
490
491         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
492                 return NULL;
493
494         spin_lock(&rbd_client_list_lock);
495         list_for_each_entry(client_node, &rbd_client_list, node) {
496                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
497                         kref_get(&client_node->kref);
498                         found = true;
499                         break;
500                 }
501         }
502         spin_unlock(&rbd_client_list_lock);
503
504         return found ? client_node : NULL;
505 }
506
507 /*
508  * mount options
509  */
510 enum {
511         Opt_last_int,
512         /* int args above */
513         Opt_last_string,
514         /* string args above */
515         Opt_read_only,
516         Opt_read_write,
517         /* Boolean args above */
518         Opt_last_bool,
519 };
520
521 static match_table_t rbd_opts_tokens = {
522         /* int args above */
523         /* string args above */
524         {Opt_read_only, "read_only"},
525         {Opt_read_only, "ro"},          /* Alternate spelling */
526         {Opt_read_write, "read_write"},
527         {Opt_read_write, "rw"},         /* Alternate spelling */
528         /* Boolean args above */
529         {-1, NULL}
530 };
531
532 struct rbd_options {
533         bool    read_only;
534 };
535
536 #define RBD_READ_ONLY_DEFAULT   false
537
538 static int parse_rbd_opts_token(char *c, void *private)
539 {
540         struct rbd_options *rbd_opts = private;
541         substring_t argstr[MAX_OPT_ARGS];
542         int token, intval, ret;
543
544         token = match_token(c, rbd_opts_tokens, argstr);
545         if (token < 0)
546                 return -EINVAL;
547
548         if (token < Opt_last_int) {
549                 ret = match_int(&argstr[0], &intval);
550                 if (ret < 0) {
551                         pr_err("bad mount option arg (not int) "
552                                "at '%s'\n", c);
553                         return ret;
554                 }
555                 dout("got int token %d val %d\n", token, intval);
556         } else if (token > Opt_last_int && token < Opt_last_string) {
557                 dout("got string token %d val %s\n", token,
558                      argstr[0].from);
559         } else if (token > Opt_last_string && token < Opt_last_bool) {
560                 dout("got Boolean token %d\n", token);
561         } else {
562                 dout("got token %d\n", token);
563         }
564
565         switch (token) {
566         case Opt_read_only:
567                 rbd_opts->read_only = true;
568                 break;
569         case Opt_read_write:
570                 rbd_opts->read_only = false;
571                 break;
572         default:
573                 rbd_assert(false);
574                 break;
575         }
576         return 0;
577 }
578
579 /*
580  * Get a ceph client with specific addr and configuration, if one does
581  * not exist create it.
582  */
583 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
584 {
585         struct rbd_client *rbdc;
586
587         rbdc = rbd_client_find(ceph_opts);
588         if (rbdc)       /* using an existing client */
589                 ceph_destroy_options(ceph_opts);
590         else
591                 rbdc = rbd_client_create(ceph_opts);
592
593         return rbdc;
594 }
595
596 /*
597  * Destroy ceph client
598  *
599  * Caller must hold rbd_client_list_lock.
600  */
601 static void rbd_client_release(struct kref *kref)
602 {
603         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
604
605         dout("rbd_release_client %p\n", rbdc);
606         spin_lock(&rbd_client_list_lock);
607         list_del(&rbdc->node);
608         spin_unlock(&rbd_client_list_lock);
609
610         ceph_destroy_client(rbdc->client);
611         kfree(rbdc);
612 }
613
614 /*
615  * Drop reference to ceph client node. If it's not referenced anymore, release
616  * it.
617  */
618 static void rbd_put_client(struct rbd_client *rbdc)
619 {
620         if (rbdc)
621                 kref_put(&rbdc->kref, rbd_client_release);
622 }
623
624 /*
625  * Destroy requests collection
626  */
627 static void rbd_coll_release(struct kref *kref)
628 {
629         struct rbd_req_coll *coll =
630                 container_of(kref, struct rbd_req_coll, kref);
631
632         dout("rbd_coll_release %p\n", coll);
633         kfree(coll);
634 }
635
636 static bool rbd_image_format_valid(u32 image_format)
637 {
638         return image_format == 1 || image_format == 2;
639 }
640
641 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
642 {
643         size_t size;
644         u32 snap_count;
645
646         /* The header has to start with the magic rbd header text */
647         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
648                 return false;
649
650         /* The bio layer requires at least sector-sized I/O */
651
652         if (ondisk->options.order < SECTOR_SHIFT)
653                 return false;
654
655         /* If we use u64 in a few spots we may be able to loosen this */
656
657         if (ondisk->options.order > 8 * sizeof (int) - 1)
658                 return false;
659
660         /*
661          * The size of a snapshot header has to fit in a size_t, and
662          * that limits the number of snapshots.
663          */
664         snap_count = le32_to_cpu(ondisk->snap_count);
665         size = SIZE_MAX - sizeof (struct ceph_snap_context);
666         if (snap_count > size / sizeof (__le64))
667                 return false;
668
669         /*
670          * Not only that, but the size of the entire the snapshot
671          * header must also be representable in a size_t.
672          */
673         size -= snap_count * sizeof (__le64);
674         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
675                 return false;
676
677         return true;
678 }
679
680 /*
681  * Create a new header structure, translate header format from the on-disk
682  * header.
683  */
684 static int rbd_header_from_disk(struct rbd_image_header *header,
685                                  struct rbd_image_header_ondisk *ondisk)
686 {
687         u32 snap_count;
688         size_t len;
689         size_t size;
690         u32 i;
691
692         memset(header, 0, sizeof (*header));
693
694         snap_count = le32_to_cpu(ondisk->snap_count);
695
696         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
697         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
698         if (!header->object_prefix)
699                 return -ENOMEM;
700         memcpy(header->object_prefix, ondisk->object_prefix, len);
701         header->object_prefix[len] = '\0';
702
703         if (snap_count) {
704                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
705
706                 /* Save a copy of the snapshot names */
707
708                 if (snap_names_len > (u64) SIZE_MAX)
709                         return -EIO;
710                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
711                 if (!header->snap_names)
712                         goto out_err;
713                 /*
714                  * Note that rbd_dev_v1_header_read() guarantees
715                  * the ondisk buffer we're working with has
716                  * snap_names_len bytes beyond the end of the
717                  * snapshot id array, this memcpy() is safe.
718                  */
719                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
720                         snap_names_len);
721
722                 /* Record each snapshot's size */
723
724                 size = snap_count * sizeof (*header->snap_sizes);
725                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
726                 if (!header->snap_sizes)
727                         goto out_err;
728                 for (i = 0; i < snap_count; i++)
729                         header->snap_sizes[i] =
730                                 le64_to_cpu(ondisk->snaps[i].image_size);
731         } else {
732                 WARN_ON(ondisk->snap_names_len);
733                 header->snap_names = NULL;
734                 header->snap_sizes = NULL;
735         }
736
737         header->features = 0;   /* No features support in v1 images */
738         header->obj_order = ondisk->options.order;
739         header->crypt_type = ondisk->options.crypt_type;
740         header->comp_type = ondisk->options.comp_type;
741
742         /* Allocate and fill in the snapshot context */
743
744         header->image_size = le64_to_cpu(ondisk->image_size);
745         size = sizeof (struct ceph_snap_context);
746         size += snap_count * sizeof (header->snapc->snaps[0]);
747         header->snapc = kzalloc(size, GFP_KERNEL);
748         if (!header->snapc)
749                 goto out_err;
750
751         atomic_set(&header->snapc->nref, 1);
752         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
753         header->snapc->num_snaps = snap_count;
754         for (i = 0; i < snap_count; i++)
755                 header->snapc->snaps[i] =
756                         le64_to_cpu(ondisk->snaps[i].id);
757
758         return 0;
759
760 out_err:
761         kfree(header->snap_sizes);
762         header->snap_sizes = NULL;
763         kfree(header->snap_names);
764         header->snap_names = NULL;
765         kfree(header->object_prefix);
766         header->object_prefix = NULL;
767
768         return -ENOMEM;
769 }
770
771 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
772 {
773         struct rbd_snap *snap;
774
775         if (snap_id == CEPH_NOSNAP)
776                 return RBD_SNAP_HEAD_NAME;
777
778         list_for_each_entry(snap, &rbd_dev->snaps, node)
779                 if (snap_id == snap->id)
780                         return snap->name;
781
782         return NULL;
783 }
784
785 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
786 {
787
788         struct rbd_snap *snap;
789
790         list_for_each_entry(snap, &rbd_dev->snaps, node) {
791                 if (!strcmp(snap_name, snap->name)) {
792                         rbd_dev->spec->snap_id = snap->id;
793                         rbd_dev->mapping.size = snap->size;
794                         rbd_dev->mapping.features = snap->features;
795
796                         return 0;
797                 }
798         }
799
800         return -ENOENT;
801 }
802
803 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
804 {
805         int ret;
806
807         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
808                     sizeof (RBD_SNAP_HEAD_NAME))) {
809                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
810                 rbd_dev->mapping.size = rbd_dev->header.image_size;
811                 rbd_dev->mapping.features = rbd_dev->header.features;
812                 ret = 0;
813         } else {
814                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
815                 if (ret < 0)
816                         goto done;
817                 rbd_dev->mapping.read_only = true;
818         }
819         atomic_set(&rbd_dev->exists, 1);
820 done:
821         return ret;
822 }
823
824 static void rbd_header_free(struct rbd_image_header *header)
825 {
826         kfree(header->object_prefix);
827         header->object_prefix = NULL;
828         kfree(header->snap_sizes);
829         header->snap_sizes = NULL;
830         kfree(header->snap_names);
831         header->snap_names = NULL;
832         ceph_put_snap_context(header->snapc);
833         header->snapc = NULL;
834 }
835
836 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
837 {
838         char *name;
839         u64 segment;
840         int ret;
841
842         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
843         if (!name)
844                 return NULL;
845         segment = offset >> rbd_dev->header.obj_order;
846         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
847                         rbd_dev->header.object_prefix, segment);
848         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
849                 pr_err("error formatting segment name for #%llu (%d)\n",
850                         segment, ret);
851                 kfree(name);
852                 name = NULL;
853         }
854
855         return name;
856 }
857
858 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
859 {
860         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
861
862         return offset & (segment_size - 1);
863 }
864
865 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
866                                 u64 offset, u64 length)
867 {
868         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
869
870         offset &= segment_size - 1;
871
872         rbd_assert(length <= U64_MAX - offset);
873         if (offset + length > segment_size)
874                 length = segment_size - offset;
875
876         return length;
877 }
878
879 static int rbd_get_num_segments(struct rbd_image_header *header,
880                                 u64 ofs, u64 len)
881 {
882         u64 start_seg;
883         u64 end_seg;
884         u64 result;
885
886         if (!len)
887                 return 0;
888         if (len - 1 > U64_MAX - ofs)
889                 return -ERANGE;
890
891         start_seg = ofs >> header->obj_order;
892         end_seg = (ofs + len - 1) >> header->obj_order;
893
894         result = end_seg - start_seg + 1;
895         if (result > (u64) INT_MAX)
896                 return -ERANGE;
897
898         return (int) result;
899 }
900
901 /*
902  * returns the size of an object in the image
903  */
904 static u64 rbd_obj_bytes(struct rbd_image_header *header)
905 {
906         return 1 << header->obj_order;
907 }
908
909 /*
910  * bio helpers
911  */
912
913 static void bio_chain_put(struct bio *chain)
914 {
915         struct bio *tmp;
916
917         while (chain) {
918                 tmp = chain;
919                 chain = chain->bi_next;
920                 bio_put(tmp);
921         }
922 }
923
924 /*
925  * zeros a bio chain, starting at specific offset
926  */
927 static void zero_bio_chain(struct bio *chain, int start_ofs)
928 {
929         struct bio_vec *bv;
930         unsigned long flags;
931         void *buf;
932         int i;
933         int pos = 0;
934
935         while (chain) {
936                 bio_for_each_segment(bv, chain, i) {
937                         if (pos + bv->bv_len > start_ofs) {
938                                 int remainder = max(start_ofs - pos, 0);
939                                 buf = bvec_kmap_irq(bv, &flags);
940                                 memset(buf + remainder, 0,
941                                        bv->bv_len - remainder);
942                                 bvec_kunmap_irq(buf, &flags);
943                         }
944                         pos += bv->bv_len;
945                 }
946
947                 chain = chain->bi_next;
948         }
949 }
950
951 /*
952  * Clone a portion of a bio, starting at the given byte offset
953  * and continuing for the number of bytes indicated.
954  */
955 static struct bio *bio_clone_range(struct bio *bio_src,
956                                         unsigned int offset,
957                                         unsigned int len,
958                                         gfp_t gfpmask)
959 {
960         struct bio_vec *bv;
961         unsigned int resid;
962         unsigned short idx;
963         unsigned int voff;
964         unsigned short end_idx;
965         unsigned short vcnt;
966         struct bio *bio;
967
968         /* Handle the easy case for the caller */
969
970         if (!offset && len == bio_src->bi_size)
971                 return bio_clone(bio_src, gfpmask);
972
973         if (WARN_ON_ONCE(!len))
974                 return NULL;
975         if (WARN_ON_ONCE(len > bio_src->bi_size))
976                 return NULL;
977         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
978                 return NULL;
979
980         /* Find first affected segment... */
981
982         resid = offset;
983         __bio_for_each_segment(bv, bio_src, idx, 0) {
984                 if (resid < bv->bv_len)
985                         break;
986                 resid -= bv->bv_len;
987         }
988         voff = resid;
989
990         /* ...and the last affected segment */
991
992         resid += len;
993         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
994                 if (resid <= bv->bv_len)
995                         break;
996                 resid -= bv->bv_len;
997         }
998         vcnt = end_idx - idx + 1;
999
1000         /* Build the clone */
1001
1002         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1003         if (!bio)
1004                 return NULL;    /* ENOMEM */
1005
1006         bio->bi_bdev = bio_src->bi_bdev;
1007         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1008         bio->bi_rw = bio_src->bi_rw;
1009         bio->bi_flags |= 1 << BIO_CLONED;
1010
1011         /*
1012          * Copy over our part of the bio_vec, then update the first
1013          * and last (or only) entries.
1014          */
1015         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1016                         vcnt * sizeof (struct bio_vec));
1017         bio->bi_io_vec[0].bv_offset += voff;
1018         if (vcnt > 1) {
1019                 bio->bi_io_vec[0].bv_len -= voff;
1020                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1021         } else {
1022                 bio->bi_io_vec[0].bv_len = len;
1023         }
1024
1025         bio->bi_vcnt = vcnt;
1026         bio->bi_size = len;
1027         bio->bi_idx = 0;
1028
1029         return bio;
1030 }
1031
1032 /*
1033  * Clone a portion of a bio chain, starting at the given byte offset
1034  * into the first bio in the source chain and continuing for the
1035  * number of bytes indicated.  The result is another bio chain of
1036  * exactly the given length, or a null pointer on error.
1037  *
1038  * The bio_src and offset parameters are both in-out.  On entry they
1039  * refer to the first source bio and the offset into that bio where
1040  * the start of data to be cloned is located.
1041  *
1042  * On return, bio_src is updated to refer to the bio in the source
1043  * chain that contains first un-cloned byte, and *offset will
1044  * contain the offset of that byte within that bio.
1045  */
1046 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1047                                         unsigned int *offset,
1048                                         unsigned int len,
1049                                         gfp_t gfpmask)
1050 {
1051         struct bio *bi = *bio_src;
1052         unsigned int off = *offset;
1053         struct bio *chain = NULL;
1054         struct bio **end;
1055
1056         /* Build up a chain of clone bios up to the limit */
1057
1058         if (!bi || off >= bi->bi_size || !len)
1059                 return NULL;            /* Nothing to clone */
1060
1061         end = &chain;
1062         while (len) {
1063                 unsigned int bi_size;
1064                 struct bio *bio;
1065
1066                 if (!bi) {
1067                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1068                         goto out_err;   /* EINVAL; ran out of bio's */
1069                 }
1070                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1071                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1072                 if (!bio)
1073                         goto out_err;   /* ENOMEM */
1074
1075                 *end = bio;
1076                 end = &bio->bi_next;
1077
1078                 off += bi_size;
1079                 if (off == bi->bi_size) {
1080                         bi = bi->bi_next;
1081                         off = 0;
1082                 }
1083                 len -= bi_size;
1084         }
1085         *bio_src = bi;
1086         *offset = off;
1087
1088         return chain;
1089 out_err:
1090         bio_chain_put(chain);
1091
1092         return NULL;
1093 }
1094
1095 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1096 {
1097         kref_get(&obj_request->kref);
1098 }
1099
1100 static void rbd_obj_request_destroy(struct kref *kref);
1101 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1102 {
1103         rbd_assert(obj_request != NULL);
1104         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1105 }
1106
1107 static void rbd_img_request_get(struct rbd_img_request *img_request)
1108 {
1109         kref_get(&img_request->kref);
1110 }
1111
1112 static void rbd_img_request_destroy(struct kref *kref);
1113 static void rbd_img_request_put(struct rbd_img_request *img_request)
1114 {
1115         rbd_assert(img_request != NULL);
1116         kref_put(&img_request->kref, rbd_img_request_destroy);
1117 }
1118
1119 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1120                                         struct rbd_obj_request *obj_request)
1121 {
1122         rbd_obj_request_get(obj_request);
1123         obj_request->img_request = img_request;
1124         list_add_tail(&obj_request->links, &img_request->obj_requests);
1125         obj_request->which = img_request->obj_request_count++;
1126         rbd_assert(obj_request->which != BAD_WHICH);
1127 }
1128
1129 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1130                                         struct rbd_obj_request *obj_request)
1131 {
1132         rbd_assert(obj_request->which != BAD_WHICH);
1133         obj_request->which = BAD_WHICH;
1134         list_del(&obj_request->links);
1135         rbd_assert(obj_request->img_request == img_request);
1136         obj_request->callback = NULL;
1137         obj_request->img_request = NULL;
1138         rbd_obj_request_put(obj_request);
1139 }
1140
1141 static bool obj_request_type_valid(enum obj_request_type type)
1142 {
1143         switch (type) {
1144         case OBJ_REQUEST_BIO:
1145                 return true;
1146         default:
1147                 return false;
1148         }
1149 }
1150
1151 struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1152 {
1153         struct ceph_osd_req_op *op;
1154         va_list args;
1155         size_t size;
1156
1157         op = kzalloc(sizeof (*op), GFP_NOIO);
1158         if (!op)
1159                 return NULL;
1160         op->op = opcode;
1161         va_start(args, opcode);
1162         switch (opcode) {
1163         case CEPH_OSD_OP_READ:
1164         case CEPH_OSD_OP_WRITE:
1165                 /* rbd_osd_req_op_create(READ, offset, length) */
1166                 /* rbd_osd_req_op_create(WRITE, offset, length) */
1167                 op->extent.offset = va_arg(args, u64);
1168                 op->extent.length = va_arg(args, u64);
1169                 if (opcode == CEPH_OSD_OP_WRITE)
1170                         op->payload_len = op->extent.length;
1171                 break;
1172         case CEPH_OSD_OP_CALL:
1173                 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1174                 op->cls.class_name = va_arg(args, char *);
1175                 size = strlen(op->cls.class_name);
1176                 rbd_assert(size <= (size_t) U8_MAX);
1177                 op->cls.class_len = size;
1178                 op->payload_len = size;
1179
1180                 op->cls.method_name = va_arg(args, char *);
1181                 size = strlen(op->cls.method_name);
1182                 rbd_assert(size <= (size_t) U8_MAX);
1183                 op->cls.method_len = size;
1184                 op->payload_len += size;
1185
1186                 op->cls.argc = 0;
1187                 op->cls.indata = va_arg(args, void *);
1188                 size = va_arg(args, size_t);
1189                 rbd_assert(size <= (size_t) U32_MAX);
1190                 op->cls.indata_len = (u32) size;
1191                 op->payload_len += size;
1192                 break;
1193         case CEPH_OSD_OP_NOTIFY_ACK:
1194         case CEPH_OSD_OP_WATCH:
1195                 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1196                 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1197                 op->watch.cookie = va_arg(args, u64);
1198                 op->watch.ver = va_arg(args, u64);
1199                 op->watch.ver = cpu_to_le64(op->watch.ver);
1200                 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1201                         op->watch.flag = (u8) 1;
1202                 break;
1203         default:
1204                 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1205                 kfree(op);
1206                 op = NULL;
1207                 break;
1208         }
1209         va_end(args);
1210
1211         return op;
1212 }
1213
1214 static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1215 {
1216         kfree(op);
1217 }
1218
1219 static void rbd_coll_end_req_index(struct request *rq,
1220                                    struct rbd_req_coll *coll,
1221                                    int index,
1222                                    s32 ret, u64 len)
1223 {
1224         struct request_queue *q;
1225         int min, max, i;
1226
1227         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1228              coll, index, (int)ret, (unsigned long long)len);
1229
1230         if (!rq)
1231                 return;
1232
1233         if (!coll) {
1234                 blk_end_request(rq, ret, len);
1235                 return;
1236         }
1237
1238         q = rq->q;
1239
1240         spin_lock_irq(q->queue_lock);
1241         coll->status[index].done = 1;
1242         coll->status[index].rc = ret;
1243         coll->status[index].bytes = len;
1244         max = min = coll->num_done;
1245         while (max < coll->total && coll->status[max].done)
1246                 max++;
1247
1248         for (i = min; i<max; i++) {
1249                 __blk_end_request(rq, (int)coll->status[i].rc,
1250                                   coll->status[i].bytes);
1251                 coll->num_done++;
1252                 kref_put(&coll->kref, rbd_coll_release);
1253         }
1254         spin_unlock_irq(q->queue_lock);
1255 }
1256
1257 static void rbd_coll_end_req(struct rbd_request *rbd_req,
1258                              s32 ret, u64 len)
1259 {
1260         rbd_coll_end_req_index(rbd_req->rq,
1261                                 rbd_req->coll, rbd_req->coll_index,
1262                                 ret, len);
1263 }
1264
1265 /*
1266  * Send ceph osd request
1267  */
1268 static int rbd_do_request(struct request *rq,
1269                           struct rbd_device *rbd_dev,
1270                           struct ceph_snap_context *snapc,
1271                           u64 snapid,
1272                           const char *object_name, u64 ofs, u64 len,
1273                           struct bio *bio,
1274                           struct page **pages,
1275                           int num_pages,
1276                           int flags,
1277                           struct ceph_osd_req_op *op,
1278                           struct rbd_req_coll *coll,
1279                           int coll_index,
1280                           void (*rbd_cb)(struct ceph_osd_request *,
1281                                          struct ceph_msg *),
1282                           u64 *ver)
1283 {
1284         struct ceph_osd_client *osdc;
1285         struct ceph_osd_request *osd_req;
1286         struct rbd_request *rbd_req = NULL;
1287         struct timespec mtime = CURRENT_TIME;
1288         int ret;
1289
1290         dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1291                 object_name, (unsigned long long) ofs,
1292                 (unsigned long long) len, coll, coll_index);
1293
1294         osdc = &rbd_dev->rbd_client->client->osdc;
1295         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_NOIO);
1296         if (!osd_req)
1297                 return -ENOMEM;
1298
1299         osd_req->r_flags = flags;
1300         osd_req->r_pages = pages;
1301         if (bio) {
1302                 osd_req->r_bio = bio;
1303                 bio_get(osd_req->r_bio);
1304         }
1305
1306         if (coll) {
1307                 ret = -ENOMEM;
1308                 rbd_req = kmalloc(sizeof(*rbd_req), GFP_NOIO);
1309                 if (!rbd_req)
1310                         goto done_osd_req;
1311
1312                 rbd_req->rq = rq;
1313                 rbd_req->bio = bio;
1314                 rbd_req->pages = pages;
1315                 rbd_req->len = len;
1316                 rbd_req->coll = coll;
1317                 rbd_req->coll_index = coll_index;
1318         }
1319
1320         osd_req->r_callback = rbd_cb;
1321         osd_req->r_priv = rbd_req;
1322
1323         strncpy(osd_req->r_oid, object_name, sizeof(osd_req->r_oid));
1324         osd_req->r_oid_len = strlen(osd_req->r_oid);
1325
1326         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1327         osd_req->r_num_pages = calc_pages_for(ofs, len);
1328         osd_req->r_page_alignment = ofs & ~PAGE_MASK;
1329
1330         ceph_osdc_build_request(osd_req, ofs, len, 1, op,
1331                                 snapc, snapid, &mtime);
1332
1333         if (op->op == CEPH_OSD_OP_WATCH && op->watch.flag) {
1334                 ceph_osdc_set_request_linger(osdc, osd_req);
1335                 rbd_dev->watch_request = osd_req;
1336         }
1337
1338         ret = ceph_osdc_start_request(osdc, osd_req, false);
1339         if (ret < 0)
1340                 goto done_err;
1341
1342         if (!rbd_cb) {
1343                 u64 version;
1344
1345                 ret = ceph_osdc_wait_request(osdc, osd_req);
1346                 version = le64_to_cpu(osd_req->r_reassert_version.version);
1347                 if (ver)
1348                         *ver = version;
1349                 dout("reassert_ver=%llu\n", (unsigned long long) version);
1350                 ceph_osdc_put_request(osd_req);
1351         }
1352         return ret;
1353
1354 done_err:
1355         if (bio)
1356                 bio_chain_put(osd_req->r_bio);
1357         kfree(rbd_req);
1358 done_osd_req:
1359         ceph_osdc_put_request(osd_req);
1360
1361         return ret;
1362 }
1363
1364 /*
1365  * Ceph osd op callback
1366  */
1367 static void rbd_req_cb(struct ceph_osd_request *osd_req, struct ceph_msg *msg)
1368 {
1369         struct rbd_request *rbd_req = osd_req->r_priv;
1370         struct ceph_osd_reply_head *replyhead;
1371         struct ceph_osd_op *op;
1372         s32 rc;
1373         u64 bytes;
1374         int read_op;
1375
1376         /* parse reply */
1377         replyhead = msg->front.iov_base;
1378         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1379         op = (void *)(replyhead + 1);
1380         rc = (s32)le32_to_cpu(replyhead->result);
1381         bytes = le64_to_cpu(op->extent.length);
1382         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1383
1384         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1385                 (unsigned long long) bytes, read_op, (int) rc);
1386
1387         if (rc == (s32)-ENOENT && read_op) {
1388                 zero_bio_chain(rbd_req->bio, 0);
1389                 rc = 0;
1390         } else if (rc == 0 && read_op && bytes < rbd_req->len) {
1391                 zero_bio_chain(rbd_req->bio, bytes);
1392                 bytes = rbd_req->len;
1393         }
1394
1395         rbd_coll_end_req(rbd_req, rc, bytes);
1396
1397         if (rbd_req->bio)
1398                 bio_chain_put(rbd_req->bio);
1399
1400         ceph_osdc_put_request(osd_req);
1401         kfree(rbd_req);
1402 }
1403
1404 static void rbd_simple_req_cb(struct ceph_osd_request *osd_req,
1405                                 struct ceph_msg *msg)
1406 {
1407         ceph_osdc_put_request(osd_req);
1408 }
1409
1410 /*
1411  * Do a synchronous ceph osd operation
1412  */
1413 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1414                            int flags,
1415                            struct ceph_osd_req_op *op,
1416                            const char *object_name,
1417                            u64 ofs, u64 inbound_size,
1418                            char *inbound,
1419                            u64 *ver)
1420 {
1421         int ret;
1422         struct page **pages;
1423         int num_pages;
1424
1425         rbd_assert(op != NULL);
1426
1427         num_pages = calc_pages_for(ofs, inbound_size);
1428         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1429         if (IS_ERR(pages))
1430                 return PTR_ERR(pages);
1431
1432         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1433                           object_name, ofs, inbound_size, NULL,
1434                           pages, num_pages,
1435                           flags,
1436                           op,
1437                           NULL, 0,
1438                           NULL,
1439                           ver);
1440         if (ret < 0)
1441                 goto done;
1442
1443         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1444                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1445
1446 done:
1447         ceph_release_page_vector(pages, num_pages);
1448         return ret;
1449 }
1450
1451 /*
1452  * Do an asynchronous ceph osd operation
1453  */
1454 static int rbd_do_op(struct request *rq,
1455                      struct rbd_device *rbd_dev,
1456                      struct ceph_snap_context *snapc,
1457                      u64 ofs, u64 len,
1458                      struct bio *bio,
1459                      struct rbd_req_coll *coll,
1460                      int coll_index)
1461 {
1462         const char *seg_name;
1463         u64 seg_ofs;
1464         u64 seg_len;
1465         int ret;
1466         struct ceph_osd_req_op *op;
1467         int opcode;
1468         int flags;
1469         u64 snapid;
1470
1471         seg_name = rbd_segment_name(rbd_dev, ofs);
1472         if (!seg_name)
1473                 return -ENOMEM;
1474         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1475         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1476
1477         if (rq_data_dir(rq) == WRITE) {
1478                 opcode = CEPH_OSD_OP_WRITE;
1479                 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1480                 snapid = CEPH_NOSNAP;
1481         } else {
1482                 opcode = CEPH_OSD_OP_READ;
1483                 flags = CEPH_OSD_FLAG_READ;
1484                 rbd_assert(!snapc);
1485                 snapid = rbd_dev->spec->snap_id;
1486         }
1487
1488         ret = -ENOMEM;
1489         op = rbd_osd_req_op_create(opcode, seg_ofs, seg_len);
1490         if (!op)
1491                 goto done;
1492
1493         /* we've taken care of segment sizes earlier when we
1494            cloned the bios. We should never have a segment
1495            truncated at this point */
1496         rbd_assert(seg_len == len);
1497
1498         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1499                              seg_name, seg_ofs, seg_len,
1500                              bio,
1501                              NULL, 0,
1502                              flags,
1503                              op,
1504                              coll, coll_index,
1505                              rbd_req_cb, NULL);
1506         if (ret < 0)
1507                 rbd_coll_end_req_index(rq, coll, coll_index,
1508                                         (s32)ret, seg_len);
1509         rbd_osd_req_op_destroy(op);
1510 done:
1511         kfree(seg_name);
1512         return ret;
1513 }
1514
1515 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1516                                 struct rbd_obj_request *obj_request)
1517 {
1518         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1519 }
1520
1521 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1522 {
1523         if (img_request->callback)
1524                 img_request->callback(img_request);
1525         else
1526                 rbd_img_request_put(img_request);
1527 }
1528
1529 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1530 {
1531         if (obj_request->callback)
1532                 obj_request->callback(obj_request);
1533 }
1534
1535 /*
1536  * Request sync osd read
1537  */
1538 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1539                           const char *object_name,
1540                           u64 ofs, u64 len,
1541                           char *buf,
1542                           u64 *ver)
1543 {
1544         struct ceph_osd_req_op *op;
1545         int ret;
1546
1547         op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, ofs, len);
1548         if (!op)
1549                 return -ENOMEM;
1550
1551         ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ,
1552                                op, object_name, ofs, len, buf, ver);
1553         rbd_osd_req_op_destroy(op);
1554
1555         return ret;
1556 }
1557
1558 /*
1559  * Request sync osd watch
1560  */
1561 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1562                                    u64 ver,
1563                                    u64 notify_id)
1564 {
1565         struct ceph_osd_req_op *op;
1566         int ret;
1567
1568         op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
1569         if (!op)
1570                 return -ENOMEM;
1571
1572         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1573                           rbd_dev->header_name, 0, 0, NULL,
1574                           NULL, 0,
1575                           CEPH_OSD_FLAG_READ,
1576                           op,
1577                           NULL, 0,
1578                           rbd_simple_req_cb, NULL);
1579
1580         rbd_osd_req_op_destroy(op);
1581
1582         return ret;
1583 }
1584
1585 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1586 {
1587         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1588         u64 hver;
1589         int rc;
1590
1591         if (!rbd_dev)
1592                 return;
1593
1594         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1595                 rbd_dev->header_name, (unsigned long long) notify_id,
1596                 (unsigned int) opcode);
1597         rc = rbd_dev_refresh(rbd_dev, &hver);
1598         if (rc)
1599                 rbd_warn(rbd_dev, "got notification but failed to "
1600                            " update snaps: %d\n", rc);
1601
1602         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1603 }
1604
1605 /*
1606  * Request sync osd watch/unwatch.  The value of "start" determines
1607  * whether a watch request is being initiated or torn down.
1608  */
1609 static int rbd_req_sync_watch(struct rbd_device *rbd_dev, int start)
1610 {
1611         struct ceph_osd_req_op *op;
1612         int ret = 0;
1613
1614         rbd_assert(start ^ !!rbd_dev->watch_event);
1615         rbd_assert(start ^ !!rbd_dev->watch_request);
1616
1617         if (start) {
1618                 struct ceph_osd_client *osdc;
1619
1620                 osdc = &rbd_dev->rbd_client->client->osdc;
1621                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, rbd_dev,
1622                                                 &rbd_dev->watch_event);
1623                 if (ret < 0)
1624                         return ret;
1625         }
1626
1627         op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
1628                                 rbd_dev->watch_event->cookie,
1629                                 rbd_dev->header.obj_version, start);
1630         if (op)
1631                 ret = rbd_req_sync_op(rbd_dev,
1632                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1633                               op, rbd_dev->header_name,
1634                               0, 0, NULL, NULL);
1635
1636         /* Cancel the event if we're tearing down, or on error */
1637
1638         if (!start || !op || ret < 0) {
1639                 ceph_osdc_cancel_event(rbd_dev->watch_event);
1640                 rbd_dev->watch_event = NULL;
1641         }
1642         rbd_osd_req_op_destroy(op);
1643
1644         return ret;
1645 }
1646
1647 /*
1648  * Synchronous osd object method call
1649  */
1650 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1651                              const char *object_name,
1652                              const char *class_name,
1653                              const char *method_name,
1654                              const char *outbound,
1655                              size_t outbound_size,
1656                              char *inbound,
1657                              size_t inbound_size,
1658                              u64 *ver)
1659 {
1660         struct ceph_osd_req_op *op;
1661         int ret;
1662
1663         /*
1664          * Any input parameters required by the method we're calling
1665          * will be sent along with the class and method names as
1666          * part of the message payload.  That data and its size are
1667          * supplied via the indata and indata_len fields (named from
1668          * the perspective of the server side) in the OSD request
1669          * operation.
1670          */
1671         op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
1672                                         method_name, outbound, outbound_size);
1673         if (!op)
1674                 return -ENOMEM;
1675
1676         ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ, op,
1677                                object_name, 0, inbound_size, inbound,
1678                                ver);
1679
1680         rbd_osd_req_op_destroy(op);
1681
1682         dout("cls_exec returned %d\n", ret);
1683         return ret;
1684 }
1685
1686 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1687 {
1688         struct rbd_req_coll *coll =
1689                         kzalloc(sizeof(struct rbd_req_coll) +
1690                                 sizeof(struct rbd_req_status) * num_reqs,
1691                                 GFP_ATOMIC);
1692
1693         if (!coll)
1694                 return NULL;
1695         coll->total = num_reqs;
1696         kref_init(&coll->kref);
1697         return coll;
1698 }
1699
1700 static int rbd_dev_do_request(struct request *rq,
1701                                 struct rbd_device *rbd_dev,
1702                                 struct ceph_snap_context *snapc,
1703                                 u64 ofs, unsigned int size,
1704                                 struct bio *bio_chain)
1705 {
1706         int num_segs;
1707         struct rbd_req_coll *coll;
1708         unsigned int bio_offset;
1709         int cur_seg = 0;
1710
1711         dout("%s 0x%x bytes at 0x%llx\n",
1712                 rq_data_dir(rq) == WRITE ? "write" : "read",
1713                 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1714
1715         num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1716         if (num_segs <= 0)
1717                 return num_segs;
1718
1719         coll = rbd_alloc_coll(num_segs);
1720         if (!coll)
1721                 return -ENOMEM;
1722
1723         bio_offset = 0;
1724         do {
1725                 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1726                 unsigned int clone_size;
1727                 struct bio *bio_clone;
1728
1729                 BUG_ON(limit > (u64)UINT_MAX);
1730                 clone_size = (unsigned int)limit;
1731                 dout("bio_chain->bi_vcnt=%hu\n", bio_chain->bi_vcnt);
1732
1733                 kref_get(&coll->kref);
1734
1735                 /* Pass a cloned bio chain via an osd request */
1736
1737                 bio_clone = bio_chain_clone_range(&bio_chain,
1738                                         &bio_offset, clone_size,
1739                                         GFP_ATOMIC);
1740                 if (bio_clone)
1741                         (void)rbd_do_op(rq, rbd_dev, snapc,
1742                                         ofs, clone_size,
1743                                         bio_clone, coll, cur_seg);
1744                 else
1745                         rbd_coll_end_req_index(rq, coll, cur_seg,
1746                                                 (s32)-ENOMEM,
1747                                                 clone_size);
1748                 size -= clone_size;
1749                 ofs += clone_size;
1750
1751                 cur_seg++;
1752         } while (size > 0);
1753         kref_put(&coll->kref, rbd_coll_release);
1754
1755         return 0;
1756 }
1757
1758 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request,
1759                                 struct ceph_osd_op *op)
1760 {
1761         u64 xferred;
1762
1763         /*
1764          * We support a 64-bit length, but ultimately it has to be
1765          * passed to blk_end_request(), which takes an unsigned int.
1766          */
1767         xferred = le64_to_cpu(op->extent.length);
1768         rbd_assert(xferred < (u64) UINT_MAX);
1769         if (obj_request->result == (s32) -ENOENT) {
1770                 zero_bio_chain(obj_request->bio_list, 0);
1771                 obj_request->result = 0;
1772         } else if (xferred < obj_request->length && !obj_request->result) {
1773                 zero_bio_chain(obj_request->bio_list, xferred);
1774                 xferred = obj_request->length;
1775         }
1776         obj_request->xferred = xferred;
1777         atomic_set(&obj_request->done, 1);
1778 }
1779
1780 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request,
1781                                 struct ceph_osd_op *op)
1782 {
1783         obj_request->xferred = le64_to_cpu(op->extent.length);
1784         atomic_set(&obj_request->done, 1);
1785 }
1786
1787 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1788                                 struct ceph_msg *msg)
1789 {
1790         struct rbd_obj_request *obj_request = osd_req->r_priv;
1791         struct ceph_osd_reply_head *reply_head;
1792         struct ceph_osd_op *op;
1793         u32 num_ops;
1794         u16 opcode;
1795
1796         rbd_assert(osd_req == obj_request->osd_req);
1797         rbd_assert(!!obj_request->img_request ^
1798                                 (obj_request->which == BAD_WHICH));
1799
1800         obj_request->xferred = le32_to_cpu(msg->hdr.data_len);
1801         reply_head = msg->front.iov_base;
1802         obj_request->result = (s32) le32_to_cpu(reply_head->result);
1803         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1804
1805         num_ops = le32_to_cpu(reply_head->num_ops);
1806         WARN_ON(num_ops != 1);  /* For now */
1807
1808         op = &reply_head->ops[0];
1809         opcode = le16_to_cpu(op->op);
1810         switch (opcode) {
1811         case CEPH_OSD_OP_READ:
1812                 rbd_osd_read_callback(obj_request, op);
1813                 break;
1814         case CEPH_OSD_OP_WRITE:
1815                 rbd_osd_write_callback(obj_request, op);
1816                 break;
1817         default:
1818                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1819                         obj_request->object_name, (unsigned short) opcode);
1820                 break;
1821         }
1822
1823         if (atomic_read(&obj_request->done))
1824                 rbd_obj_request_complete(obj_request);
1825 }
1826
1827 static struct ceph_osd_request *rbd_osd_req_create(
1828                                         struct rbd_device *rbd_dev,
1829                                         bool write_request,
1830                                         struct rbd_obj_request *obj_request,
1831                                         struct ceph_osd_req_op *op)
1832 {
1833         struct rbd_img_request *img_request = obj_request->img_request;
1834         struct ceph_snap_context *snapc = NULL;
1835         struct ceph_osd_client *osdc;
1836         struct ceph_osd_request *osd_req;
1837         struct timespec now;
1838         struct timespec *mtime;
1839         u64 snap_id = CEPH_NOSNAP;
1840         u64 offset = obj_request->offset;
1841         u64 length = obj_request->length;
1842
1843         if (img_request) {
1844                 rbd_assert(img_request->write_request == write_request);
1845                 if (img_request->write_request)
1846                         snapc = img_request->snapc;
1847                 else
1848                         snap_id = img_request->snap_id;
1849         }
1850
1851         /* Allocate and initialize the request, for the single op */
1852
1853         osdc = &rbd_dev->rbd_client->client->osdc;
1854         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1855         if (!osd_req)
1856                 return NULL;    /* ENOMEM */
1857
1858         rbd_assert(obj_request_type_valid(obj_request->type));
1859         switch (obj_request->type) {
1860         case OBJ_REQUEST_BIO:
1861                 rbd_assert(obj_request->bio_list != NULL);
1862                 osd_req->r_bio = obj_request->bio_list;
1863                 bio_get(osd_req->r_bio);
1864                 /* osd client requires "num pages" even for bio */
1865                 osd_req->r_num_pages = calc_pages_for(offset, length);
1866                 break;
1867         }
1868
1869         if (write_request) {
1870                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1871                 now = CURRENT_TIME;
1872                 mtime = &now;
1873         } else {
1874                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1875                 mtime = NULL;   /* not needed for reads */
1876                 offset = 0;     /* These are not used... */
1877                 length = 0;     /* ...for osd read requests */
1878         }
1879
1880         osd_req->r_callback = rbd_osd_req_callback;
1881         osd_req->r_priv = obj_request;
1882
1883         osd_req->r_oid_len = strlen(obj_request->object_name);
1884         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1885         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1886
1887         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1888
1889         /* osd_req will get its own reference to snapc (if non-null) */
1890
1891         ceph_osdc_build_request(osd_req, offset, length, 1, op,
1892                                 snapc, snap_id, mtime);
1893
1894         return osd_req;
1895 }
1896
1897 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1898 {
1899         ceph_osdc_put_request(osd_req);
1900 }
1901
1902 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1903
1904 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1905                                                 u64 offset, u64 length,
1906                                                 enum obj_request_type type)
1907 {
1908         struct rbd_obj_request *obj_request;
1909         size_t size;
1910         char *name;
1911
1912         rbd_assert(obj_request_type_valid(type));
1913
1914         size = strlen(object_name) + 1;
1915         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1916         if (!obj_request)
1917                 return NULL;
1918
1919         name = (char *)(obj_request + 1);
1920         obj_request->object_name = memcpy(name, object_name, size);
1921         obj_request->offset = offset;
1922         obj_request->length = length;
1923         obj_request->which = BAD_WHICH;
1924         obj_request->type = type;
1925         INIT_LIST_HEAD(&obj_request->links);
1926         atomic_set(&obj_request->done, 0);
1927         kref_init(&obj_request->kref);
1928
1929         return obj_request;
1930 }
1931
1932 static void rbd_obj_request_destroy(struct kref *kref)
1933 {
1934         struct rbd_obj_request *obj_request;
1935
1936         obj_request = container_of(kref, struct rbd_obj_request, kref);
1937
1938         rbd_assert(obj_request->img_request == NULL);
1939         rbd_assert(obj_request->which == BAD_WHICH);
1940
1941         if (obj_request->osd_req)
1942                 rbd_osd_req_destroy(obj_request->osd_req);
1943
1944         rbd_assert(obj_request_type_valid(obj_request->type));
1945         switch (obj_request->type) {
1946         case OBJ_REQUEST_BIO:
1947                 if (obj_request->bio_list)
1948                         bio_chain_put(obj_request->bio_list);
1949                 break;
1950         }
1951
1952         kfree(obj_request);
1953 }
1954
1955 /*
1956  * Caller is responsible for filling in the list of object requests
1957  * that comprises the image request, and the Linux request pointer
1958  * (if there is one).
1959  */
1960 struct rbd_img_request *rbd_img_request_create(struct rbd_device *rbd_dev,
1961                                         u64 offset, u64 length,
1962                                         bool write_request)
1963 {
1964         struct rbd_img_request *img_request;
1965         struct ceph_snap_context *snapc = NULL;
1966
1967         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1968         if (!img_request)
1969                 return NULL;
1970
1971         if (write_request) {
1972                 down_read(&rbd_dev->header_rwsem);
1973                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1974                 up_read(&rbd_dev->header_rwsem);
1975                 if (WARN_ON(!snapc)) {
1976                         kfree(img_request);
1977                         return NULL;    /* Shouldn't happen */
1978                 }
1979         }
1980
1981         img_request->rq = NULL;
1982         img_request->rbd_dev = rbd_dev;
1983         img_request->offset = offset;
1984         img_request->length = length;
1985         img_request->write_request = write_request;
1986         if (write_request)
1987                 img_request->snapc = snapc;
1988         else
1989                 img_request->snap_id = rbd_dev->spec->snap_id;
1990         spin_lock_init(&img_request->completion_lock);
1991         img_request->next_completion = 0;
1992         img_request->callback = NULL;
1993         img_request->obj_request_count = 0;
1994         INIT_LIST_HEAD(&img_request->obj_requests);
1995         kref_init(&img_request->kref);
1996
1997         rbd_img_request_get(img_request);       /* Avoid a warning */
1998         rbd_img_request_put(img_request);       /* TEMPORARY */
1999
2000         return img_request;
2001 }
2002
2003 static void rbd_img_request_destroy(struct kref *kref)
2004 {
2005         struct rbd_img_request *img_request;
2006         struct rbd_obj_request *obj_request;
2007         struct rbd_obj_request *next_obj_request;
2008
2009         img_request = container_of(kref, struct rbd_img_request, kref);
2010
2011         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2012                 rbd_img_obj_request_del(img_request, obj_request);
2013
2014         if (img_request->write_request)
2015                 ceph_put_snap_context(img_request->snapc);
2016
2017         kfree(img_request);
2018 }
2019
2020 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
2021                                         struct bio *bio_list)
2022 {
2023         struct rbd_device *rbd_dev = img_request->rbd_dev;
2024         struct rbd_obj_request *obj_request = NULL;
2025         struct rbd_obj_request *next_obj_request;
2026         unsigned int bio_offset;
2027         u64 image_offset;
2028         u64 resid;
2029         u16 opcode;
2030
2031         opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
2032                                               : CEPH_OSD_OP_READ;
2033         bio_offset = 0;
2034         image_offset = img_request->offset;
2035         rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
2036         resid = img_request->length;
2037         while (resid) {
2038                 const char *object_name;
2039                 unsigned int clone_size;
2040                 struct ceph_osd_req_op *op;
2041                 u64 offset;
2042                 u64 length;
2043
2044                 object_name = rbd_segment_name(rbd_dev, image_offset);
2045                 if (!object_name)
2046                         goto out_unwind;
2047                 offset = rbd_segment_offset(rbd_dev, image_offset);
2048                 length = rbd_segment_length(rbd_dev, image_offset, resid);
2049                 obj_request = rbd_obj_request_create(object_name,
2050                                                 offset, length,
2051                                                 OBJ_REQUEST_BIO);
2052                 kfree(object_name);     /* object request has its own copy */
2053                 if (!obj_request)
2054                         goto out_unwind;
2055
2056                 rbd_assert(length <= (u64) UINT_MAX);
2057                 clone_size = (unsigned int) length;
2058                 obj_request->bio_list = bio_chain_clone_range(&bio_list,
2059                                                 &bio_offset, clone_size,
2060                                                 GFP_ATOMIC);
2061                 if (!obj_request->bio_list)
2062                         goto out_partial;
2063
2064                 /*
2065                  * Build up the op to use in building the osd
2066                  * request.  Note that the contents of the op are
2067                  * copied by rbd_osd_req_create().
2068                  */
2069                 op = rbd_osd_req_op_create(opcode, offset, length);
2070                 if (!op)
2071                         goto out_partial;
2072                 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
2073                                                 img_request->write_request,
2074                                                 obj_request, op);
2075                 rbd_osd_req_op_destroy(op);
2076                 if (!obj_request->osd_req)
2077                         goto out_partial;
2078                 /* status and version are initially zero-filled */
2079
2080                 rbd_img_obj_request_add(img_request, obj_request);
2081
2082                 image_offset += length;
2083                 resid -= length;
2084         }
2085
2086         return 0;
2087
2088 out_partial:
2089         rbd_obj_request_put(obj_request);
2090 out_unwind:
2091         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2092                 rbd_obj_request_put(obj_request);
2093
2094         return -ENOMEM;
2095 }
2096
2097 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2098 {
2099         struct rbd_img_request *img_request;
2100         u32 which = obj_request->which;
2101         bool more = true;
2102
2103         img_request = obj_request->img_request;
2104         rbd_assert(img_request != NULL);
2105         rbd_assert(img_request->rq != NULL);
2106         rbd_assert(which != BAD_WHICH);
2107         rbd_assert(which < img_request->obj_request_count);
2108         rbd_assert(which >= img_request->next_completion);
2109
2110         spin_lock_irq(&img_request->completion_lock);
2111         if (which != img_request->next_completion)
2112                 goto out;
2113
2114         for_each_obj_request_from(img_request, obj_request) {
2115                 unsigned int xferred;
2116                 int result;
2117
2118                 rbd_assert(more);
2119                 rbd_assert(which < img_request->obj_request_count);
2120
2121                 if (!atomic_read(&obj_request->done))
2122                         break;
2123
2124                 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
2125                 xferred = (unsigned int) obj_request->xferred;
2126                 result = (int) obj_request->result;
2127                 if (result)
2128                         rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
2129                                 img_request->write_request ? "write" : "read",
2130                                 result, xferred);
2131
2132                 more = blk_end_request(img_request->rq, result, xferred);
2133                 which++;
2134         }
2135         rbd_assert(more ^ (which == img_request->obj_request_count));
2136         img_request->next_completion = which;
2137 out:
2138         spin_unlock_irq(&img_request->completion_lock);
2139
2140         if (!more)
2141                 rbd_img_request_complete(img_request);
2142 }
2143
2144 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2145 {
2146         struct rbd_device *rbd_dev = img_request->rbd_dev;
2147         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2148         struct rbd_obj_request *obj_request;
2149
2150         for_each_obj_request(img_request, obj_request) {
2151                 int ret;
2152
2153                 obj_request->callback = rbd_img_obj_callback;
2154                 ret = rbd_obj_request_submit(osdc, obj_request);
2155                 if (ret)
2156                         return ret;
2157                 /*
2158                  * The image request has its own reference to each
2159                  * of its object requests, so we can safely drop the
2160                  * initial one here.
2161                  */
2162                 rbd_obj_request_put(obj_request);
2163         }
2164
2165         return 0;
2166 }
2167
2168 static void rbd_request_fn(struct request_queue *q)
2169 {
2170         struct rbd_device *rbd_dev = q->queuedata;
2171         bool read_only = rbd_dev->mapping.read_only;
2172         struct request *rq;
2173         int result;
2174
2175         while ((rq = blk_fetch_request(q))) {
2176                 bool write_request = rq_data_dir(rq) == WRITE;
2177                 struct rbd_img_request *img_request;
2178                 u64 offset;
2179                 u64 length;
2180
2181                 /* Ignore any non-FS requests that filter through. */
2182
2183                 if (rq->cmd_type != REQ_TYPE_FS) {
2184                         __blk_end_request_all(rq, 0);
2185                         continue;
2186                 }
2187
2188                 spin_unlock_irq(q->queue_lock);
2189
2190                 /* Disallow writes to a read-only device */
2191
2192                 if (write_request) {
2193                         result = -EROFS;
2194                         if (read_only)
2195                                 goto end_request;
2196                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2197                 }
2198
2199                 /* Quit early if the snapshot has disappeared */
2200
2201                 if (!atomic_read(&rbd_dev->exists)) {
2202                         dout("request for non-existent snapshot");
2203                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2204                         result = -ENXIO;
2205                         goto end_request;
2206                 }
2207
2208                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2209                 length = (u64) blk_rq_bytes(rq);
2210
2211                 result = -EINVAL;
2212                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2213                         goto end_request;       /* Shouldn't happen */
2214
2215                 result = -ENOMEM;
2216                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2217                                                         write_request);
2218                 if (!img_request)
2219                         goto end_request;
2220
2221                 img_request->rq = rq;
2222
2223                 result = rbd_img_request_fill_bio(img_request, rq->bio);
2224                 if (!result)
2225                         result = rbd_img_request_submit(img_request);
2226                 if (result)
2227                         rbd_img_request_put(img_request);
2228 end_request:
2229                 spin_lock_irq(q->queue_lock);
2230                 if (result < 0) {
2231                         rbd_warn(rbd_dev, "obj_request %s result %d\n",
2232                                 write_request ? "write" : "read", result);
2233                         __blk_end_request_all(rq, result);
2234                 }
2235         }
2236 }
2237
2238 /*
2239  * block device queue callback
2240  */
2241 static void rbd_rq_fn(struct request_queue *q)
2242 {
2243         struct rbd_device *rbd_dev = q->queuedata;
2244         bool read_only = rbd_dev->mapping.read_only;
2245         struct request *rq;
2246
2247         while ((rq = blk_fetch_request(q))) {
2248                 struct ceph_snap_context *snapc = NULL;
2249                 unsigned int size = 0;
2250                 int result;
2251
2252                 dout("fetched request\n");
2253
2254                 /* Filter out block requests we don't understand */
2255
2256                 if ((rq->cmd_type != REQ_TYPE_FS)) {
2257                         __blk_end_request_all(rq, 0);
2258                         continue;
2259                 }
2260                 spin_unlock_irq(q->queue_lock);
2261
2262                 /* Write requests need a reference to the snapshot context */
2263
2264                 if (rq_data_dir(rq) == WRITE) {
2265                         result = -EROFS;
2266                         if (read_only) /* Can't write to a read-only device */
2267                                 goto out_end_request;
2268
2269                         /*
2270                          * Note that each osd request will take its
2271                          * own reference to the snapshot context
2272                          * supplied.  The reference we take here
2273                          * just guarantees the one we provide stays
2274                          * valid.
2275                          */
2276                         down_read(&rbd_dev->header_rwsem);
2277                         snapc = ceph_get_snap_context(rbd_dev->header.snapc);
2278                         up_read(&rbd_dev->header_rwsem);
2279                         rbd_assert(snapc != NULL);
2280                 } else if (!atomic_read(&rbd_dev->exists)) {
2281                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2282                         dout("request for non-existent snapshot");
2283                         result = -ENXIO;
2284                         goto out_end_request;
2285                 }
2286
2287                 size = blk_rq_bytes(rq);
2288                 result = rbd_dev_do_request(rq, rbd_dev, snapc,
2289                                 blk_rq_pos(rq) * SECTOR_SIZE,
2290                                 size, rq->bio);
2291 out_end_request:
2292                 if (snapc)
2293                         ceph_put_snap_context(snapc);
2294                 spin_lock_irq(q->queue_lock);
2295                 if (!size || result < 0)
2296                         __blk_end_request_all(rq, result);
2297         }
2298 }
2299
2300 /*
2301  * a queue callback. Makes sure that we don't create a bio that spans across
2302  * multiple osd objects. One exception would be with a single page bios,
2303  * which we handle later at bio_chain_clone_range()
2304  */
2305 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2306                           struct bio_vec *bvec)
2307 {
2308         struct rbd_device *rbd_dev = q->queuedata;
2309         sector_t sector_offset;
2310         sector_t sectors_per_obj;
2311         sector_t obj_sector_offset;
2312         int ret;
2313
2314         /*
2315          * Find how far into its rbd object the partition-relative
2316          * bio start sector is to offset relative to the enclosing
2317          * device.
2318          */
2319         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2320         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2321         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2322
2323         /*
2324          * Compute the number of bytes from that offset to the end
2325          * of the object.  Account for what's already used by the bio.
2326          */
2327         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2328         if (ret > bmd->bi_size)
2329                 ret -= bmd->bi_size;
2330         else
2331                 ret = 0;
2332
2333         /*
2334          * Don't send back more than was asked for.  And if the bio
2335          * was empty, let the whole thing through because:  "Note
2336          * that a block device *must* allow a single page to be
2337          * added to an empty bio."
2338          */
2339         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2340         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2341                 ret = (int) bvec->bv_len;
2342
2343         return ret;
2344 }
2345
2346 static void rbd_free_disk(struct rbd_device *rbd_dev)
2347 {
2348         struct gendisk *disk = rbd_dev->disk;
2349
2350         if (!disk)
2351                 return;
2352
2353         if (disk->flags & GENHD_FL_UP)
2354                 del_gendisk(disk);
2355         if (disk->queue)
2356                 blk_cleanup_queue(disk->queue);
2357         put_disk(disk);
2358 }
2359
2360 /*
2361  * Read the complete header for the given rbd device.
2362  *
2363  * Returns a pointer to a dynamically-allocated buffer containing
2364  * the complete and validated header.  Caller can pass the address
2365  * of a variable that will be filled in with the version of the
2366  * header object at the time it was read.
2367  *
2368  * Returns a pointer-coded errno if a failure occurs.
2369  */
2370 static struct rbd_image_header_ondisk *
2371 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2372 {
2373         struct rbd_image_header_ondisk *ondisk = NULL;
2374         u32 snap_count = 0;
2375         u64 names_size = 0;
2376         u32 want_count;
2377         int ret;
2378
2379         /*
2380          * The complete header will include an array of its 64-bit
2381          * snapshot ids, followed by the names of those snapshots as
2382          * a contiguous block of NUL-terminated strings.  Note that
2383          * the number of snapshots could change by the time we read
2384          * it in, in which case we re-read it.
2385          */
2386         do {
2387                 size_t size;
2388
2389                 kfree(ondisk);
2390
2391                 size = sizeof (*ondisk);
2392                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2393                 size += names_size;
2394                 ondisk = kmalloc(size, GFP_KERNEL);
2395                 if (!ondisk)
2396                         return ERR_PTR(-ENOMEM);
2397
2398                 ret = rbd_req_sync_read(rbd_dev, rbd_dev->header_name,
2399                                        0, size,
2400                                        (char *) ondisk, version);
2401
2402                 if (ret < 0)
2403                         goto out_err;
2404                 if (WARN_ON((size_t) ret < size)) {
2405                         ret = -ENXIO;
2406                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2407                                 size, ret);
2408                         goto out_err;
2409                 }
2410                 if (!rbd_dev_ondisk_valid(ondisk)) {
2411                         ret = -ENXIO;
2412                         rbd_warn(rbd_dev, "invalid header");
2413                         goto out_err;
2414                 }
2415
2416                 names_size = le64_to_cpu(ondisk->snap_names_len);
2417                 want_count = snap_count;
2418                 snap_count = le32_to_cpu(ondisk->snap_count);
2419         } while (snap_count != want_count);
2420
2421         return ondisk;
2422
2423 out_err:
2424         kfree(ondisk);
2425
2426         return ERR_PTR(ret);
2427 }
2428
2429 /*
2430  * reload the ondisk the header
2431  */
2432 static int rbd_read_header(struct rbd_device *rbd_dev,
2433                            struct rbd_image_header *header)
2434 {
2435         struct rbd_image_header_ondisk *ondisk;
2436         u64 ver = 0;
2437         int ret;
2438
2439         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2440         if (IS_ERR(ondisk))
2441                 return PTR_ERR(ondisk);
2442         ret = rbd_header_from_disk(header, ondisk);
2443         if (ret >= 0)
2444                 header->obj_version = ver;
2445         kfree(ondisk);
2446
2447         return ret;
2448 }
2449
2450 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2451 {
2452         struct rbd_snap *snap;
2453         struct rbd_snap *next;
2454
2455         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2456                 rbd_remove_snap_dev(snap);
2457 }
2458
2459 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2460 {
2461         sector_t size;
2462
2463         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2464                 return;
2465
2466         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2467         dout("setting size to %llu sectors", (unsigned long long) size);
2468         rbd_dev->mapping.size = (u64) size;
2469         set_capacity(rbd_dev->disk, size);
2470 }
2471
2472 /*
2473  * only read the first part of the ondisk header, without the snaps info
2474  */
2475 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2476 {
2477         int ret;
2478         struct rbd_image_header h;
2479
2480         ret = rbd_read_header(rbd_dev, &h);
2481         if (ret < 0)
2482                 return ret;
2483
2484         down_write(&rbd_dev->header_rwsem);
2485
2486         /* Update image size, and check for resize of mapped image */
2487         rbd_dev->header.image_size = h.image_size;
2488         rbd_update_mapping_size(rbd_dev);
2489
2490         /* rbd_dev->header.object_prefix shouldn't change */
2491         kfree(rbd_dev->header.snap_sizes);
2492         kfree(rbd_dev->header.snap_names);
2493         /* osd requests may still refer to snapc */
2494         ceph_put_snap_context(rbd_dev->header.snapc);
2495
2496         if (hver)
2497                 *hver = h.obj_version;
2498         rbd_dev->header.obj_version = h.obj_version;
2499         rbd_dev->header.image_size = h.image_size;
2500         rbd_dev->header.snapc = h.snapc;
2501         rbd_dev->header.snap_names = h.snap_names;
2502         rbd_dev->header.snap_sizes = h.snap_sizes;
2503         /* Free the extra copy of the object prefix */
2504         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2505         kfree(h.object_prefix);
2506
2507         ret = rbd_dev_snaps_update(rbd_dev);
2508         if (!ret)
2509                 ret = rbd_dev_snaps_register(rbd_dev);
2510
2511         up_write(&rbd_dev->header_rwsem);
2512
2513         return ret;
2514 }
2515
2516 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2517 {
2518         int ret;
2519
2520         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2521         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2522         if (rbd_dev->image_format == 1)
2523                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2524         else
2525                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2526         mutex_unlock(&ctl_mutex);
2527
2528         return ret;
2529 }
2530
2531 static int rbd_init_disk(struct rbd_device *rbd_dev)
2532 {
2533         struct gendisk *disk;
2534         struct request_queue *q;
2535         u64 segment_size;
2536
2537         /* create gendisk info */
2538         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2539         if (!disk)
2540                 return -ENOMEM;
2541
2542         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2543                  rbd_dev->dev_id);
2544         disk->major = rbd_dev->major;
2545         disk->first_minor = 0;
2546         disk->fops = &rbd_bd_ops;
2547         disk->private_data = rbd_dev;
2548
2549         (void) rbd_rq_fn;               /* avoid a warning */
2550         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2551         if (!q)
2552                 goto out_disk;
2553
2554         /* We use the default size, but let's be explicit about it. */
2555         blk_queue_physical_block_size(q, SECTOR_SIZE);
2556
2557         /* set io sizes to object size */
2558         segment_size = rbd_obj_bytes(&rbd_dev->header);
2559         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2560         blk_queue_max_segment_size(q, segment_size);
2561         blk_queue_io_min(q, segment_size);
2562         blk_queue_io_opt(q, segment_size);
2563
2564         blk_queue_merge_bvec(q, rbd_merge_bvec);
2565         disk->queue = q;
2566
2567         q->queuedata = rbd_dev;
2568
2569         rbd_dev->disk = disk;
2570
2571         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2572
2573         return 0;
2574 out_disk:
2575         put_disk(disk);
2576
2577         return -ENOMEM;
2578 }
2579
2580 /*
2581   sysfs
2582 */
2583
2584 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2585 {
2586         return container_of(dev, struct rbd_device, dev);
2587 }
2588
2589 static ssize_t rbd_size_show(struct device *dev,
2590                              struct device_attribute *attr, char *buf)
2591 {
2592         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2593         sector_t size;
2594
2595         down_read(&rbd_dev->header_rwsem);
2596         size = get_capacity(rbd_dev->disk);
2597         up_read(&rbd_dev->header_rwsem);
2598
2599         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2600 }
2601
2602 /*
2603  * Note this shows the features for whatever's mapped, which is not
2604  * necessarily the base image.
2605  */
2606 static ssize_t rbd_features_show(struct device *dev,
2607                              struct device_attribute *attr, char *buf)
2608 {
2609         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2610
2611         return sprintf(buf, "0x%016llx\n",
2612                         (unsigned long long) rbd_dev->mapping.features);
2613 }
2614
2615 static ssize_t rbd_major_show(struct device *dev,
2616                               struct device_attribute *attr, char *buf)
2617 {
2618         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2619
2620         return sprintf(buf, "%d\n", rbd_dev->major);
2621 }
2622
2623 static ssize_t rbd_client_id_show(struct device *dev,
2624                                   struct device_attribute *attr, char *buf)
2625 {
2626         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2627
2628         return sprintf(buf, "client%lld\n",
2629                         ceph_client_id(rbd_dev->rbd_client->client));
2630 }
2631
2632 static ssize_t rbd_pool_show(struct device *dev,
2633                              struct device_attribute *attr, char *buf)
2634 {
2635         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2636
2637         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2638 }
2639
2640 static ssize_t rbd_pool_id_show(struct device *dev,
2641                              struct device_attribute *attr, char *buf)
2642 {
2643         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2644
2645         return sprintf(buf, "%llu\n",
2646                 (unsigned long long) rbd_dev->spec->pool_id);
2647 }
2648
2649 static ssize_t rbd_name_show(struct device *dev,
2650                              struct device_attribute *attr, char *buf)
2651 {
2652         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2653
2654         if (rbd_dev->spec->image_name)
2655                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2656
2657         return sprintf(buf, "(unknown)\n");
2658 }
2659
2660 static ssize_t rbd_image_id_show(struct device *dev,
2661                              struct device_attribute *attr, char *buf)
2662 {
2663         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2664
2665         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2666 }
2667
2668 /*
2669  * Shows the name of the currently-mapped snapshot (or
2670  * RBD_SNAP_HEAD_NAME for the base image).
2671  */
2672 static ssize_t rbd_snap_show(struct device *dev,
2673                              struct device_attribute *attr,
2674                              char *buf)
2675 {
2676         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2677
2678         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2679 }
2680
2681 /*
2682  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2683  * for the parent image.  If there is no parent, simply shows
2684  * "(no parent image)".
2685  */
2686 static ssize_t rbd_parent_show(struct device *dev,
2687                              struct device_attribute *attr,
2688                              char *buf)
2689 {
2690         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2691         struct rbd_spec *spec = rbd_dev->parent_spec;
2692         int count;
2693         char *bufp = buf;
2694
2695         if (!spec)
2696                 return sprintf(buf, "(no parent image)\n");
2697
2698         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2699                         (unsigned long long) spec->pool_id, spec->pool_name);
2700         if (count < 0)
2701                 return count;
2702         bufp += count;
2703
2704         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2705                         spec->image_name ? spec->image_name : "(unknown)");
2706         if (count < 0)
2707                 return count;
2708         bufp += count;
2709
2710         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2711                         (unsigned long long) spec->snap_id, spec->snap_name);
2712         if (count < 0)
2713                 return count;
2714         bufp += count;
2715
2716         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2717         if (count < 0)
2718                 return count;
2719         bufp += count;
2720
2721         return (ssize_t) (bufp - buf);
2722 }
2723
2724 static ssize_t rbd_image_refresh(struct device *dev,
2725                                  struct device_attribute *attr,
2726                                  const char *buf,
2727                                  size_t size)
2728 {
2729         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2730         int ret;
2731
2732         ret = rbd_dev_refresh(rbd_dev, NULL);
2733
2734         return ret < 0 ? ret : size;
2735 }
2736
2737 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2738 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2739 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2740 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2741 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2742 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2743 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2744 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2745 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2746 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2747 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2748
2749 static struct attribute *rbd_attrs[] = {
2750         &dev_attr_size.attr,
2751         &dev_attr_features.attr,
2752         &dev_attr_major.attr,
2753         &dev_attr_client_id.attr,
2754         &dev_attr_pool.attr,
2755         &dev_attr_pool_id.attr,
2756         &dev_attr_name.attr,
2757         &dev_attr_image_id.attr,
2758         &dev_attr_current_snap.attr,
2759         &dev_attr_parent.attr,
2760         &dev_attr_refresh.attr,
2761         NULL
2762 };
2763
2764 static struct attribute_group rbd_attr_group = {
2765         .attrs = rbd_attrs,
2766 };
2767
2768 static const struct attribute_group *rbd_attr_groups[] = {
2769         &rbd_attr_group,
2770         NULL
2771 };
2772
2773 static void rbd_sysfs_dev_release(struct device *dev)
2774 {
2775 }
2776
2777 static struct device_type rbd_device_type = {
2778         .name           = "rbd",
2779         .groups         = rbd_attr_groups,
2780         .release        = rbd_sysfs_dev_release,
2781 };
2782
2783
2784 /*
2785   sysfs - snapshots
2786 */
2787
2788 static ssize_t rbd_snap_size_show(struct device *dev,
2789                                   struct device_attribute *attr,
2790                                   char *buf)
2791 {
2792         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2793
2794         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2795 }
2796
2797 static ssize_t rbd_snap_id_show(struct device *dev,
2798                                 struct device_attribute *attr,
2799                                 char *buf)
2800 {
2801         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2802
2803         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2804 }
2805
2806 static ssize_t rbd_snap_features_show(struct device *dev,
2807                                 struct device_attribute *attr,
2808                                 char *buf)
2809 {
2810         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2811
2812         return sprintf(buf, "0x%016llx\n",
2813                         (unsigned long long) snap->features);
2814 }
2815
2816 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2817 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2818 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2819
2820 static struct attribute *rbd_snap_attrs[] = {
2821         &dev_attr_snap_size.attr,
2822         &dev_attr_snap_id.attr,
2823         &dev_attr_snap_features.attr,
2824         NULL,
2825 };
2826
2827 static struct attribute_group rbd_snap_attr_group = {
2828         .attrs = rbd_snap_attrs,
2829 };
2830
2831 static void rbd_snap_dev_release(struct device *dev)
2832 {
2833         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2834         kfree(snap->name);
2835         kfree(snap);
2836 }
2837
2838 static const struct attribute_group *rbd_snap_attr_groups[] = {
2839         &rbd_snap_attr_group,
2840         NULL
2841 };
2842
2843 static struct device_type rbd_snap_device_type = {
2844         .groups         = rbd_snap_attr_groups,
2845         .release        = rbd_snap_dev_release,
2846 };
2847
2848 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2849 {
2850         kref_get(&spec->kref);
2851
2852         return spec;
2853 }
2854
2855 static void rbd_spec_free(struct kref *kref);
2856 static void rbd_spec_put(struct rbd_spec *spec)
2857 {
2858         if (spec)
2859                 kref_put(&spec->kref, rbd_spec_free);
2860 }
2861
2862 static struct rbd_spec *rbd_spec_alloc(void)
2863 {
2864         struct rbd_spec *spec;
2865
2866         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2867         if (!spec)
2868                 return NULL;
2869         kref_init(&spec->kref);
2870
2871         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2872
2873         return spec;
2874 }
2875
2876 static void rbd_spec_free(struct kref *kref)
2877 {
2878         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2879
2880         kfree(spec->pool_name);
2881         kfree(spec->image_id);
2882         kfree(spec->image_name);
2883         kfree(spec->snap_name);
2884         kfree(spec);
2885 }
2886
2887 struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2888                                 struct rbd_spec *spec)
2889 {
2890         struct rbd_device *rbd_dev;
2891
2892         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2893         if (!rbd_dev)
2894                 return NULL;
2895
2896         spin_lock_init(&rbd_dev->lock);
2897         atomic_set(&rbd_dev->exists, 0);
2898         INIT_LIST_HEAD(&rbd_dev->node);
2899         INIT_LIST_HEAD(&rbd_dev->snaps);
2900         init_rwsem(&rbd_dev->header_rwsem);
2901
2902         rbd_dev->spec = spec;
2903         rbd_dev->rbd_client = rbdc;
2904
2905         /* Initialize the layout used for all rbd requests */
2906
2907         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2908         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2909         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2910         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2911
2912         return rbd_dev;
2913 }
2914
2915 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2916 {
2917         rbd_spec_put(rbd_dev->parent_spec);
2918         kfree(rbd_dev->header_name);
2919         rbd_put_client(rbd_dev->rbd_client);
2920         rbd_spec_put(rbd_dev->spec);
2921         kfree(rbd_dev);
2922 }
2923
2924 static bool rbd_snap_registered(struct rbd_snap *snap)
2925 {
2926         bool ret = snap->dev.type == &rbd_snap_device_type;
2927         bool reg = device_is_registered(&snap->dev);
2928
2929         rbd_assert(!ret ^ reg);
2930
2931         return ret;
2932 }
2933
2934 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2935 {
2936         list_del(&snap->node);
2937         if (device_is_registered(&snap->dev))
2938                 device_unregister(&snap->dev);
2939 }
2940
2941 static int rbd_register_snap_dev(struct rbd_snap *snap,
2942                                   struct device *parent)
2943 {
2944         struct device *dev = &snap->dev;
2945         int ret;
2946
2947         dev->type = &rbd_snap_device_type;
2948         dev->parent = parent;
2949         dev->release = rbd_snap_dev_release;
2950         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2951         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2952
2953         ret = device_register(dev);
2954
2955         return ret;
2956 }
2957
2958 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2959                                                 const char *snap_name,
2960                                                 u64 snap_id, u64 snap_size,
2961                                                 u64 snap_features)
2962 {
2963         struct rbd_snap *snap;
2964         int ret;
2965
2966         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2967         if (!snap)
2968                 return ERR_PTR(-ENOMEM);
2969
2970         ret = -ENOMEM;
2971         snap->name = kstrdup(snap_name, GFP_KERNEL);
2972         if (!snap->name)
2973                 goto err;
2974
2975         snap->id = snap_id;
2976         snap->size = snap_size;
2977         snap->features = snap_features;
2978
2979         return snap;
2980
2981 err:
2982         kfree(snap->name);
2983         kfree(snap);
2984
2985         return ERR_PTR(ret);
2986 }
2987
2988 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2989                 u64 *snap_size, u64 *snap_features)
2990 {
2991         char *snap_name;
2992
2993         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2994
2995         *snap_size = rbd_dev->header.snap_sizes[which];
2996         *snap_features = 0;     /* No features for v1 */
2997
2998         /* Skip over names until we find the one we are looking for */
2999
3000         snap_name = rbd_dev->header.snap_names;
3001         while (which--)
3002                 snap_name += strlen(snap_name) + 1;
3003
3004         return snap_name;
3005 }
3006
3007 /*
3008  * Get the size and object order for an image snapshot, or if
3009  * snap_id is CEPH_NOSNAP, gets this information for the base
3010  * image.
3011  */
3012 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3013                                 u8 *order, u64 *snap_size)
3014 {
3015         __le64 snapid = cpu_to_le64(snap_id);
3016         int ret;
3017         struct {
3018                 u8 order;
3019                 __le64 size;
3020         } __attribute__ ((packed)) size_buf = { 0 };
3021
3022         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
3023                                 "rbd", "get_size",
3024                                 (char *) &snapid, sizeof (snapid),
3025                                 (char *) &size_buf, sizeof (size_buf), NULL);
3026         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3027         if (ret < 0)
3028                 return ret;
3029
3030         *order = size_buf.order;
3031         *snap_size = le64_to_cpu(size_buf.size);
3032
3033         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3034                 (unsigned long long) snap_id, (unsigned int) *order,
3035                 (unsigned long long) *snap_size);
3036
3037         return 0;
3038 }
3039
3040 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3041 {
3042         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3043                                         &rbd_dev->header.obj_order,
3044                                         &rbd_dev->header.image_size);
3045 }
3046
3047 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3048 {
3049         void *reply_buf;
3050         int ret;
3051         void *p;
3052
3053         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3054         if (!reply_buf)
3055                 return -ENOMEM;
3056
3057         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
3058                                 "rbd", "get_object_prefix",
3059                                 NULL, 0,
3060                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3061         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3062         if (ret < 0)
3063                 goto out;
3064         ret = 0;    /* rbd_req_sync_exec() can return positive */
3065
3066         p = reply_buf;
3067         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3068                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
3069                                                 NULL, GFP_NOIO);
3070
3071         if (IS_ERR(rbd_dev->header.object_prefix)) {
3072                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3073                 rbd_dev->header.object_prefix = NULL;
3074         } else {
3075                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3076         }
3077
3078 out:
3079         kfree(reply_buf);
3080
3081         return ret;
3082 }
3083
3084 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3085                 u64 *snap_features)
3086 {
3087         __le64 snapid = cpu_to_le64(snap_id);
3088         struct {
3089                 __le64 features;
3090                 __le64 incompat;
3091         } features_buf = { 0 };
3092         u64 incompat;
3093         int ret;
3094
3095         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
3096                                 "rbd", "get_features",
3097                                 (char *) &snapid, sizeof (snapid),
3098                                 (char *) &features_buf, sizeof (features_buf),
3099                                 NULL);
3100         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3101         if (ret < 0)
3102                 return ret;
3103
3104         incompat = le64_to_cpu(features_buf.incompat);
3105         if (incompat & ~RBD_FEATURES_ALL)
3106                 return -ENXIO;
3107
3108         *snap_features = le64_to_cpu(features_buf.features);
3109
3110         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3111                 (unsigned long long) snap_id,
3112                 (unsigned long long) *snap_features,
3113                 (unsigned long long) le64_to_cpu(features_buf.incompat));
3114
3115         return 0;
3116 }
3117
3118 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3119 {
3120         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3121                                                 &rbd_dev->header.features);
3122 }
3123
3124 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3125 {
3126         struct rbd_spec *parent_spec;
3127         size_t size;
3128         void *reply_buf = NULL;
3129         __le64 snapid;
3130         void *p;
3131         void *end;
3132         char *image_id;
3133         u64 overlap;
3134         int ret;
3135
3136         parent_spec = rbd_spec_alloc();
3137         if (!parent_spec)
3138                 return -ENOMEM;
3139
3140         size = sizeof (__le64) +                                /* pool_id */
3141                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3142                 sizeof (__le64) +                               /* snap_id */
3143                 sizeof (__le64);                                /* overlap */
3144         reply_buf = kmalloc(size, GFP_KERNEL);
3145         if (!reply_buf) {
3146                 ret = -ENOMEM;
3147                 goto out_err;
3148         }
3149
3150         snapid = cpu_to_le64(CEPH_NOSNAP);
3151         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
3152                                 "rbd", "get_parent",
3153                                 (char *) &snapid, sizeof (snapid),
3154                                 (char *) reply_buf, size, NULL);
3155         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3156         if (ret < 0)
3157                 goto out_err;
3158
3159         ret = -ERANGE;
3160         p = reply_buf;
3161         end = (char *) reply_buf + size;
3162         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3163         if (parent_spec->pool_id == CEPH_NOPOOL)
3164                 goto out;       /* No parent?  No problem. */
3165
3166         /* The ceph file layout needs to fit pool id in 32 bits */
3167
3168         ret = -EIO;
3169         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
3170                 goto out;
3171
3172         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3173         if (IS_ERR(image_id)) {
3174                 ret = PTR_ERR(image_id);
3175                 goto out_err;
3176         }
3177         parent_spec->image_id = image_id;
3178         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3179         ceph_decode_64_safe(&p, end, overlap, out_err);
3180
3181         rbd_dev->parent_overlap = overlap;
3182         rbd_dev->parent_spec = parent_spec;
3183         parent_spec = NULL;     /* rbd_dev now owns this */
3184 out:
3185         ret = 0;
3186 out_err:
3187         kfree(reply_buf);
3188         rbd_spec_put(parent_spec);
3189
3190         return ret;
3191 }
3192
3193 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3194 {
3195         size_t image_id_size;
3196         char *image_id;
3197         void *p;
3198         void *end;
3199         size_t size;
3200         void *reply_buf = NULL;
3201         size_t len = 0;
3202         char *image_name = NULL;
3203         int ret;
3204
3205         rbd_assert(!rbd_dev->spec->image_name);
3206
3207         len = strlen(rbd_dev->spec->image_id);
3208         image_id_size = sizeof (__le32) + len;
3209         image_id = kmalloc(image_id_size, GFP_KERNEL);
3210         if (!image_id)
3211                 return NULL;
3212
3213         p = image_id;
3214         end = (char *) image_id + image_id_size;
3215         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
3216
3217         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3218         reply_buf = kmalloc(size, GFP_KERNEL);
3219         if (!reply_buf)
3220                 goto out;
3221
3222         ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
3223                                 "rbd", "dir_get_name",
3224                                 image_id, image_id_size,
3225                                 (char *) reply_buf, size, NULL);
3226         if (ret < 0)
3227                 goto out;
3228         p = reply_buf;
3229         end = (char *) reply_buf + size;
3230         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3231         if (IS_ERR(image_name))
3232                 image_name = NULL;
3233         else
3234                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3235 out:
3236         kfree(reply_buf);
3237         kfree(image_id);
3238
3239         return image_name;
3240 }
3241
3242 /*
3243  * When a parent image gets probed, we only have the pool, image,
3244  * and snapshot ids but not the names of any of them.  This call
3245  * is made later to fill in those names.  It has to be done after
3246  * rbd_dev_snaps_update() has completed because some of the
3247  * information (in particular, snapshot name) is not available
3248  * until then.
3249  */
3250 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3251 {
3252         struct ceph_osd_client *osdc;
3253         const char *name;
3254         void *reply_buf = NULL;
3255         int ret;
3256
3257         if (rbd_dev->spec->pool_name)
3258                 return 0;       /* Already have the names */
3259
3260         /* Look up the pool name */
3261
3262         osdc = &rbd_dev->rbd_client->client->osdc;
3263         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3264         if (!name) {
3265                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3266                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3267                 return -EIO;
3268         }
3269
3270         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3271         if (!rbd_dev->spec->pool_name)
3272                 return -ENOMEM;
3273
3274         /* Fetch the image name; tolerate failure here */
3275
3276         name = rbd_dev_image_name(rbd_dev);
3277         if (name)
3278                 rbd_dev->spec->image_name = (char *) name;
3279         else
3280                 rbd_warn(rbd_dev, "unable to get image name");
3281
3282         /* Look up the snapshot name. */
3283
3284         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3285         if (!name) {
3286                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3287                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3288                 ret = -EIO;
3289                 goto out_err;
3290         }
3291         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3292         if(!rbd_dev->spec->snap_name)
3293                 goto out_err;
3294
3295         return 0;
3296 out_err:
3297         kfree(reply_buf);
3298         kfree(rbd_dev->spec->pool_name);
3299         rbd_dev->spec->pool_name = NULL;
3300
3301         return ret;
3302 }
3303
3304 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3305 {
3306         size_t size;
3307         int ret;
3308         void *reply_buf;
3309         void *p;
3310         void *end;
3311         u64 seq;
3312         u32 snap_count;
3313         struct ceph_snap_context *snapc;
3314         u32 i;
3315
3316         /*
3317          * We'll need room for the seq value (maximum snapshot id),
3318          * snapshot count, and array of that many snapshot ids.
3319          * For now we have a fixed upper limit on the number we're
3320          * prepared to receive.
3321          */
3322         size = sizeof (__le64) + sizeof (__le32) +
3323                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3324         reply_buf = kzalloc(size, GFP_KERNEL);
3325         if (!reply_buf)
3326                 return -ENOMEM;
3327
3328         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
3329                                 "rbd", "get_snapcontext",
3330                                 NULL, 0,
3331                                 reply_buf, size, ver);
3332         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3333         if (ret < 0)
3334                 goto out;
3335
3336         ret = -ERANGE;
3337         p = reply_buf;
3338         end = (char *) reply_buf + size;
3339         ceph_decode_64_safe(&p, end, seq, out);
3340         ceph_decode_32_safe(&p, end, snap_count, out);
3341
3342         /*
3343          * Make sure the reported number of snapshot ids wouldn't go
3344          * beyond the end of our buffer.  But before checking that,
3345          * make sure the computed size of the snapshot context we
3346          * allocate is representable in a size_t.
3347          */
3348         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3349                                  / sizeof (u64)) {
3350                 ret = -EINVAL;
3351                 goto out;
3352         }
3353         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3354                 goto out;
3355
3356         size = sizeof (struct ceph_snap_context) +
3357                                 snap_count * sizeof (snapc->snaps[0]);
3358         snapc = kmalloc(size, GFP_KERNEL);
3359         if (!snapc) {
3360                 ret = -ENOMEM;
3361                 goto out;
3362         }
3363
3364         atomic_set(&snapc->nref, 1);
3365         snapc->seq = seq;
3366         snapc->num_snaps = snap_count;
3367         for (i = 0; i < snap_count; i++)
3368                 snapc->snaps[i] = ceph_decode_64(&p);
3369
3370         rbd_dev->header.snapc = snapc;
3371
3372         dout("  snap context seq = %llu, snap_count = %u\n",
3373                 (unsigned long long) seq, (unsigned int) snap_count);
3374
3375 out:
3376         kfree(reply_buf);
3377
3378         return 0;
3379 }
3380
3381 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3382 {
3383         size_t size;
3384         void *reply_buf;
3385         __le64 snap_id;
3386         int ret;
3387         void *p;
3388         void *end;
3389         char *snap_name;
3390
3391         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3392         reply_buf = kmalloc(size, GFP_KERNEL);
3393         if (!reply_buf)
3394                 return ERR_PTR(-ENOMEM);
3395
3396         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3397         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
3398                                 "rbd", "get_snapshot_name",
3399                                 (char *) &snap_id, sizeof (snap_id),
3400                                 reply_buf, size, NULL);
3401         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3402         if (ret < 0)
3403                 goto out;
3404
3405         p = reply_buf;
3406         end = (char *) reply_buf + size;
3407         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3408         if (IS_ERR(snap_name)) {
3409                 ret = PTR_ERR(snap_name);
3410                 goto out;
3411         } else {
3412                 dout("  snap_id 0x%016llx snap_name = %s\n",
3413                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3414         }
3415         kfree(reply_buf);
3416
3417         return snap_name;
3418 out:
3419         kfree(reply_buf);
3420
3421         return ERR_PTR(ret);
3422 }
3423
3424 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3425                 u64 *snap_size, u64 *snap_features)
3426 {
3427         u64 snap_id;
3428         u8 order;
3429         int ret;
3430
3431         snap_id = rbd_dev->header.snapc->snaps[which];
3432         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3433         if (ret)
3434                 return ERR_PTR(ret);
3435         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3436         if (ret)
3437                 return ERR_PTR(ret);
3438
3439         return rbd_dev_v2_snap_name(rbd_dev, which);
3440 }
3441
3442 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3443                 u64 *snap_size, u64 *snap_features)
3444 {
3445         if (rbd_dev->image_format == 1)
3446                 return rbd_dev_v1_snap_info(rbd_dev, which,
3447                                         snap_size, snap_features);
3448         if (rbd_dev->image_format == 2)
3449                 return rbd_dev_v2_snap_info(rbd_dev, which,
3450                                         snap_size, snap_features);
3451         return ERR_PTR(-EINVAL);
3452 }
3453
3454 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3455 {
3456         int ret;
3457         __u8 obj_order;
3458
3459         down_write(&rbd_dev->header_rwsem);
3460
3461         /* Grab old order first, to see if it changes */
3462
3463         obj_order = rbd_dev->header.obj_order,
3464         ret = rbd_dev_v2_image_size(rbd_dev);
3465         if (ret)
3466                 goto out;
3467         if (rbd_dev->header.obj_order != obj_order) {
3468                 ret = -EIO;
3469                 goto out;
3470         }
3471         rbd_update_mapping_size(rbd_dev);
3472
3473         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3474         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3475         if (ret)
3476                 goto out;
3477         ret = rbd_dev_snaps_update(rbd_dev);
3478         dout("rbd_dev_snaps_update returned %d\n", ret);
3479         if (ret)
3480                 goto out;
3481         ret = rbd_dev_snaps_register(rbd_dev);
3482         dout("rbd_dev_snaps_register returned %d\n", ret);
3483 out:
3484         up_write(&rbd_dev->header_rwsem);
3485
3486         return ret;
3487 }
3488
3489 /*
3490  * Scan the rbd device's current snapshot list and compare it to the
3491  * newly-received snapshot context.  Remove any existing snapshots
3492  * not present in the new snapshot context.  Add a new snapshot for
3493  * any snaphots in the snapshot context not in the current list.
3494  * And verify there are no changes to snapshots we already know
3495  * about.
3496  *
3497  * Assumes the snapshots in the snapshot context are sorted by
3498  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3499  * are also maintained in that order.)
3500  */
3501 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3502 {
3503         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3504         const u32 snap_count = snapc->num_snaps;
3505         struct list_head *head = &rbd_dev->snaps;
3506         struct list_head *links = head->next;
3507         u32 index = 0;
3508
3509         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3510         while (index < snap_count || links != head) {
3511                 u64 snap_id;
3512                 struct rbd_snap *snap;
3513                 char *snap_name;
3514                 u64 snap_size = 0;
3515                 u64 snap_features = 0;
3516
3517                 snap_id = index < snap_count ? snapc->snaps[index]
3518                                              : CEPH_NOSNAP;
3519                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3520                                      : NULL;
3521                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3522
3523                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3524                         struct list_head *next = links->next;
3525
3526                         /* Existing snapshot not in the new snap context */
3527
3528                         if (rbd_dev->spec->snap_id == snap->id)
3529                                 atomic_set(&rbd_dev->exists, 0);
3530                         rbd_remove_snap_dev(snap);
3531                         dout("%ssnap id %llu has been removed\n",
3532                                 rbd_dev->spec->snap_id == snap->id ?
3533                                                         "mapped " : "",
3534                                 (unsigned long long) snap->id);
3535
3536                         /* Done with this list entry; advance */
3537
3538                         links = next;
3539                         continue;
3540                 }
3541
3542                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3543                                         &snap_size, &snap_features);
3544                 if (IS_ERR(snap_name))
3545                         return PTR_ERR(snap_name);
3546
3547                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3548                         (unsigned long long) snap_id);
3549                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3550                         struct rbd_snap *new_snap;
3551
3552                         /* We haven't seen this snapshot before */
3553
3554                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3555                                         snap_id, snap_size, snap_features);
3556                         if (IS_ERR(new_snap)) {
3557                                 int err = PTR_ERR(new_snap);
3558
3559                                 dout("  failed to add dev, error %d\n", err);
3560
3561                                 return err;
3562                         }
3563
3564                         /* New goes before existing, or at end of list */
3565
3566                         dout("  added dev%s\n", snap ? "" : " at end\n");
3567                         if (snap)
3568                                 list_add_tail(&new_snap->node, &snap->node);
3569                         else
3570                                 list_add_tail(&new_snap->node, head);
3571                 } else {
3572                         /* Already have this one */
3573
3574                         dout("  already present\n");
3575
3576                         rbd_assert(snap->size == snap_size);
3577                         rbd_assert(!strcmp(snap->name, snap_name));
3578                         rbd_assert(snap->features == snap_features);
3579
3580                         /* Done with this list entry; advance */
3581
3582                         links = links->next;
3583                 }
3584
3585                 /* Advance to the next entry in the snapshot context */
3586
3587                 index++;
3588         }
3589         dout("%s: done\n", __func__);
3590
3591         return 0;
3592 }
3593
3594 /*
3595  * Scan the list of snapshots and register the devices for any that
3596  * have not already been registered.
3597  */
3598 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3599 {
3600         struct rbd_snap *snap;
3601         int ret = 0;
3602
3603         dout("%s called\n", __func__);
3604         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3605                 return -EIO;
3606
3607         list_for_each_entry(snap, &rbd_dev->snaps, node) {
3608                 if (!rbd_snap_registered(snap)) {
3609                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3610                         if (ret < 0)
3611                                 break;
3612                 }
3613         }
3614         dout("%s: returning %d\n", __func__, ret);
3615
3616         return ret;
3617 }
3618
3619 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3620 {
3621         struct device *dev;
3622         int ret;
3623
3624         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3625
3626         dev = &rbd_dev->dev;
3627         dev->bus = &rbd_bus_type;
3628         dev->type = &rbd_device_type;
3629         dev->parent = &rbd_root_dev;
3630         dev->release = rbd_dev_release;
3631         dev_set_name(dev, "%d", rbd_dev->dev_id);
3632         ret = device_register(dev);
3633
3634         mutex_unlock(&ctl_mutex);
3635
3636         return ret;
3637 }
3638
3639 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3640 {
3641         device_unregister(&rbd_dev->dev);
3642 }
3643
3644 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3645
3646 /*
3647  * Get a unique rbd identifier for the given new rbd_dev, and add
3648  * the rbd_dev to the global list.  The minimum rbd id is 1.
3649  */
3650 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3651 {
3652         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3653
3654         spin_lock(&rbd_dev_list_lock);
3655         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3656         spin_unlock(&rbd_dev_list_lock);
3657         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3658                 (unsigned long long) rbd_dev->dev_id);
3659 }
3660
3661 /*
3662  * Remove an rbd_dev from the global list, and record that its
3663  * identifier is no longer in use.
3664  */
3665 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3666 {
3667         struct list_head *tmp;
3668         int rbd_id = rbd_dev->dev_id;
3669         int max_id;
3670
3671         rbd_assert(rbd_id > 0);
3672
3673         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3674                 (unsigned long long) rbd_dev->dev_id);
3675         spin_lock(&rbd_dev_list_lock);
3676         list_del_init(&rbd_dev->node);
3677
3678         /*
3679          * If the id being "put" is not the current maximum, there
3680          * is nothing special we need to do.
3681          */
3682         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3683                 spin_unlock(&rbd_dev_list_lock);
3684                 return;
3685         }
3686
3687         /*
3688          * We need to update the current maximum id.  Search the
3689          * list to find out what it is.  We're more likely to find
3690          * the maximum at the end, so search the list backward.
3691          */
3692         max_id = 0;
3693         list_for_each_prev(tmp, &rbd_dev_list) {
3694                 struct rbd_device *rbd_dev;
3695
3696                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3697                 if (rbd_dev->dev_id > max_id)
3698                         max_id = rbd_dev->dev_id;
3699         }
3700         spin_unlock(&rbd_dev_list_lock);
3701
3702         /*
3703          * The max id could have been updated by rbd_dev_id_get(), in
3704          * which case it now accurately reflects the new maximum.
3705          * Be careful not to overwrite the maximum value in that
3706          * case.
3707          */
3708         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3709         dout("  max dev id has been reset\n");
3710 }
3711
3712 /*
3713  * Skips over white space at *buf, and updates *buf to point to the
3714  * first found non-space character (if any). Returns the length of
3715  * the token (string of non-white space characters) found.  Note
3716  * that *buf must be terminated with '\0'.
3717  */
3718 static inline size_t next_token(const char **buf)
3719 {
3720         /*
3721         * These are the characters that produce nonzero for
3722         * isspace() in the "C" and "POSIX" locales.
3723         */
3724         const char *spaces = " \f\n\r\t\v";
3725
3726         *buf += strspn(*buf, spaces);   /* Find start of token */
3727
3728         return strcspn(*buf, spaces);   /* Return token length */
3729 }
3730
3731 /*
3732  * Finds the next token in *buf, and if the provided token buffer is
3733  * big enough, copies the found token into it.  The result, if
3734  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3735  * must be terminated with '\0' on entry.
3736  *
3737  * Returns the length of the token found (not including the '\0').
3738  * Return value will be 0 if no token is found, and it will be >=
3739  * token_size if the token would not fit.
3740  *
3741  * The *buf pointer will be updated to point beyond the end of the
3742  * found token.  Note that this occurs even if the token buffer is
3743  * too small to hold it.
3744  */
3745 static inline size_t copy_token(const char **buf,
3746                                 char *token,
3747                                 size_t token_size)
3748 {
3749         size_t len;
3750
3751         len = next_token(buf);
3752         if (len < token_size) {
3753                 memcpy(token, *buf, len);
3754                 *(token + len) = '\0';
3755         }
3756         *buf += len;
3757
3758         return len;
3759 }
3760
3761 /*
3762  * Finds the next token in *buf, dynamically allocates a buffer big
3763  * enough to hold a copy of it, and copies the token into the new
3764  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3765  * that a duplicate buffer is created even for a zero-length token.
3766  *
3767  * Returns a pointer to the newly-allocated duplicate, or a null
3768  * pointer if memory for the duplicate was not available.  If
3769  * the lenp argument is a non-null pointer, the length of the token
3770  * (not including the '\0') is returned in *lenp.
3771  *
3772  * If successful, the *buf pointer will be updated to point beyond
3773  * the end of the found token.
3774  *
3775  * Note: uses GFP_KERNEL for allocation.
3776  */
3777 static inline char *dup_token(const char **buf, size_t *lenp)
3778 {
3779         char *dup;
3780         size_t len;
3781
3782         len = next_token(buf);
3783         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3784         if (!dup)
3785                 return NULL;
3786         *(dup + len) = '\0';
3787         *buf += len;
3788
3789         if (lenp)
3790                 *lenp = len;
3791
3792         return dup;
3793 }
3794
3795 /*
3796  * Parse the options provided for an "rbd add" (i.e., rbd image
3797  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3798  * and the data written is passed here via a NUL-terminated buffer.
3799  * Returns 0 if successful or an error code otherwise.
3800  *
3801  * The information extracted from these options is recorded in
3802  * the other parameters which return dynamically-allocated
3803  * structures:
3804  *  ceph_opts
3805  *      The address of a pointer that will refer to a ceph options
3806  *      structure.  Caller must release the returned pointer using
3807  *      ceph_destroy_options() when it is no longer needed.
3808  *  rbd_opts
3809  *      Address of an rbd options pointer.  Fully initialized by
3810  *      this function; caller must release with kfree().
3811  *  spec
3812  *      Address of an rbd image specification pointer.  Fully
3813  *      initialized by this function based on parsed options.
3814  *      Caller must release with rbd_spec_put().
3815  *
3816  * The options passed take this form:
3817  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3818  * where:
3819  *  <mon_addrs>
3820  *      A comma-separated list of one or more monitor addresses.
3821  *      A monitor address is an ip address, optionally followed
3822  *      by a port number (separated by a colon).
3823  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3824  *  <options>
3825  *      A comma-separated list of ceph and/or rbd options.
3826  *  <pool_name>
3827  *      The name of the rados pool containing the rbd image.
3828  *  <image_name>
3829  *      The name of the image in that pool to map.
3830  *  <snap_id>
3831  *      An optional snapshot id.  If provided, the mapping will
3832  *      present data from the image at the time that snapshot was
3833  *      created.  The image head is used if no snapshot id is
3834  *      provided.  Snapshot mappings are always read-only.
3835  */
3836 static int rbd_add_parse_args(const char *buf,
3837                                 struct ceph_options **ceph_opts,
3838                                 struct rbd_options **opts,
3839                                 struct rbd_spec **rbd_spec)
3840 {
3841         size_t len;
3842         char *options;
3843         const char *mon_addrs;
3844         size_t mon_addrs_size;
3845         struct rbd_spec *spec = NULL;
3846         struct rbd_options *rbd_opts = NULL;
3847         struct ceph_options *copts;
3848         int ret;
3849
3850         /* The first four tokens are required */
3851
3852         len = next_token(&buf);
3853         if (!len) {
3854                 rbd_warn(NULL, "no monitor address(es) provided");
3855                 return -EINVAL;
3856         }
3857         mon_addrs = buf;
3858         mon_addrs_size = len + 1;
3859         buf += len;
3860
3861         ret = -EINVAL;
3862         options = dup_token(&buf, NULL);
3863         if (!options)
3864                 return -ENOMEM;
3865         if (!*options) {
3866                 rbd_warn(NULL, "no options provided");
3867                 goto out_err;
3868         }
3869
3870         spec = rbd_spec_alloc();
3871         if (!spec)
3872                 goto out_mem;
3873
3874         spec->pool_name = dup_token(&buf, NULL);
3875         if (!spec->pool_name)
3876                 goto out_mem;
3877         if (!*spec->pool_name) {
3878                 rbd_warn(NULL, "no pool name provided");
3879                 goto out_err;
3880         }
3881
3882         spec->image_name = dup_token(&buf, NULL);
3883         if (!spec->image_name)
3884                 goto out_mem;
3885         if (!*spec->image_name) {
3886                 rbd_warn(NULL, "no image name provided");
3887                 goto out_err;
3888         }
3889
3890         /*
3891          * Snapshot name is optional; default is to use "-"
3892          * (indicating the head/no snapshot).
3893          */
3894         len = next_token(&buf);
3895         if (!len) {
3896                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3897                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3898         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3899                 ret = -ENAMETOOLONG;
3900                 goto out_err;
3901         }
3902         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3903         if (!spec->snap_name)
3904                 goto out_mem;
3905         *(spec->snap_name + len) = '\0';
3906
3907         /* Initialize all rbd options to the defaults */
3908
3909         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3910         if (!rbd_opts)
3911                 goto out_mem;
3912
3913         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3914
3915         copts = ceph_parse_options(options, mon_addrs,
3916                                         mon_addrs + mon_addrs_size - 1,
3917                                         parse_rbd_opts_token, rbd_opts);
3918         if (IS_ERR(copts)) {
3919                 ret = PTR_ERR(copts);
3920                 goto out_err;
3921         }
3922         kfree(options);
3923
3924         *ceph_opts = copts;
3925         *opts = rbd_opts;
3926         *rbd_spec = spec;
3927
3928         return 0;
3929 out_mem:
3930         ret = -ENOMEM;
3931 out_err:
3932         kfree(rbd_opts);
3933         rbd_spec_put(spec);
3934         kfree(options);
3935
3936         return ret;
3937 }
3938
3939 /*
3940  * An rbd format 2 image has a unique identifier, distinct from the
3941  * name given to it by the user.  Internally, that identifier is
3942  * what's used to specify the names of objects related to the image.
3943  *
3944  * A special "rbd id" object is used to map an rbd image name to its
3945  * id.  If that object doesn't exist, then there is no v2 rbd image
3946  * with the supplied name.
3947  *
3948  * This function will record the given rbd_dev's image_id field if
3949  * it can be determined, and in that case will return 0.  If any
3950  * errors occur a negative errno will be returned and the rbd_dev's
3951  * image_id field will be unchanged (and should be NULL).
3952  */
3953 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3954 {
3955         int ret;
3956         size_t size;
3957         char *object_name;
3958         void *response;
3959         void *p;
3960
3961         /*
3962          * When probing a parent image, the image id is already
3963          * known (and the image name likely is not).  There's no
3964          * need to fetch the image id again in this case.
3965          */
3966         if (rbd_dev->spec->image_id)
3967                 return 0;
3968
3969         /*
3970          * First, see if the format 2 image id file exists, and if
3971          * so, get the image's persistent id from it.
3972          */
3973         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3974         object_name = kmalloc(size, GFP_NOIO);
3975         if (!object_name)
3976                 return -ENOMEM;
3977         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3978         dout("rbd id object name is %s\n", object_name);
3979
3980         /* Response will be an encoded string, which includes a length */
3981
3982         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3983         response = kzalloc(size, GFP_NOIO);
3984         if (!response) {
3985                 ret = -ENOMEM;
3986                 goto out;
3987         }
3988
3989         ret = rbd_req_sync_exec(rbd_dev, object_name,
3990                                 "rbd", "get_id",
3991                                 NULL, 0,
3992                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3993         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3994         if (ret < 0)
3995                 goto out;
3996         ret = 0;    /* rbd_req_sync_exec() can return positive */
3997
3998         p = response;
3999         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
4000                                                 p + RBD_IMAGE_ID_LEN_MAX,
4001                                                 NULL, GFP_NOIO);
4002         if (IS_ERR(rbd_dev->spec->image_id)) {
4003                 ret = PTR_ERR(rbd_dev->spec->image_id);
4004                 rbd_dev->spec->image_id = NULL;
4005         } else {
4006                 dout("image_id is %s\n", rbd_dev->spec->image_id);
4007         }
4008 out:
4009         kfree(response);
4010         kfree(object_name);
4011
4012         return ret;
4013 }
4014
4015 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4016 {
4017         int ret;
4018         size_t size;
4019
4020         /* Version 1 images have no id; empty string is used */
4021
4022         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
4023         if (!rbd_dev->spec->image_id)
4024                 return -ENOMEM;
4025
4026         /* Record the header object name for this rbd image. */
4027
4028         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
4029         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4030         if (!rbd_dev->header_name) {
4031                 ret = -ENOMEM;
4032                 goto out_err;
4033         }
4034         sprintf(rbd_dev->header_name, "%s%s",
4035                 rbd_dev->spec->image_name, RBD_SUFFIX);
4036
4037         /* Populate rbd image metadata */
4038
4039         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4040         if (ret < 0)
4041                 goto out_err;
4042
4043         /* Version 1 images have no parent (no layering) */
4044
4045         rbd_dev->parent_spec = NULL;
4046         rbd_dev->parent_overlap = 0;
4047
4048         rbd_dev->image_format = 1;
4049
4050         dout("discovered version 1 image, header name is %s\n",
4051                 rbd_dev->header_name);
4052
4053         return 0;
4054
4055 out_err:
4056         kfree(rbd_dev->header_name);
4057         rbd_dev->header_name = NULL;
4058         kfree(rbd_dev->spec->image_id);
4059         rbd_dev->spec->image_id = NULL;
4060
4061         return ret;
4062 }
4063
4064 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4065 {
4066         size_t size;
4067         int ret;
4068         u64 ver = 0;
4069
4070         /*
4071          * Image id was filled in by the caller.  Record the header
4072          * object name for this rbd image.
4073          */
4074         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4075         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4076         if (!rbd_dev->header_name)
4077                 return -ENOMEM;
4078         sprintf(rbd_dev->header_name, "%s%s",
4079                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4080
4081         /* Get the size and object order for the image */
4082
4083         ret = rbd_dev_v2_image_size(rbd_dev);
4084         if (ret < 0)
4085                 goto out_err;
4086
4087         /* Get the object prefix (a.k.a. block_name) for the image */
4088
4089         ret = rbd_dev_v2_object_prefix(rbd_dev);
4090         if (ret < 0)
4091                 goto out_err;
4092
4093         /* Get the and check features for the image */
4094
4095         ret = rbd_dev_v2_features(rbd_dev);
4096         if (ret < 0)
4097                 goto out_err;
4098
4099         /* If the image supports layering, get the parent info */
4100
4101         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4102                 ret = rbd_dev_v2_parent_info(rbd_dev);
4103                 if (ret < 0)
4104                         goto out_err;
4105         }
4106
4107         /* crypto and compression type aren't (yet) supported for v2 images */
4108
4109         rbd_dev->header.crypt_type = 0;
4110         rbd_dev->header.comp_type = 0;
4111
4112         /* Get the snapshot context, plus the header version */
4113
4114         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4115         if (ret)
4116                 goto out_err;
4117         rbd_dev->header.obj_version = ver;
4118
4119         rbd_dev->image_format = 2;
4120
4121         dout("discovered version 2 image, header name is %s\n",
4122                 rbd_dev->header_name);
4123
4124         return 0;
4125 out_err:
4126         rbd_dev->parent_overlap = 0;
4127         rbd_spec_put(rbd_dev->parent_spec);
4128         rbd_dev->parent_spec = NULL;
4129         kfree(rbd_dev->header_name);
4130         rbd_dev->header_name = NULL;
4131         kfree(rbd_dev->header.object_prefix);
4132         rbd_dev->header.object_prefix = NULL;
4133
4134         return ret;
4135 }
4136
4137 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4138 {
4139         int ret;
4140
4141         /* no need to lock here, as rbd_dev is not registered yet */
4142         ret = rbd_dev_snaps_update(rbd_dev);
4143         if (ret)
4144                 return ret;
4145
4146         ret = rbd_dev_probe_update_spec(rbd_dev);
4147         if (ret)
4148                 goto err_out_snaps;
4149
4150         ret = rbd_dev_set_mapping(rbd_dev);
4151         if (ret)
4152                 goto err_out_snaps;
4153
4154         /* generate unique id: find highest unique id, add one */
4155         rbd_dev_id_get(rbd_dev);
4156
4157         /* Fill in the device name, now that we have its id. */
4158         BUILD_BUG_ON(DEV_NAME_LEN
4159                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4160         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4161
4162         /* Get our block major device number. */
4163
4164         ret = register_blkdev(0, rbd_dev->name);
4165         if (ret < 0)
4166                 goto err_out_id;
4167         rbd_dev->major = ret;
4168
4169         /* Set up the blkdev mapping. */
4170
4171         ret = rbd_init_disk(rbd_dev);
4172         if (ret)
4173                 goto err_out_blkdev;
4174
4175         ret = rbd_bus_add_dev(rbd_dev);
4176         if (ret)
4177                 goto err_out_disk;
4178
4179         /*
4180          * At this point cleanup in the event of an error is the job
4181          * of the sysfs code (initiated by rbd_bus_del_dev()).
4182          */
4183         down_write(&rbd_dev->header_rwsem);
4184         ret = rbd_dev_snaps_register(rbd_dev);
4185         up_write(&rbd_dev->header_rwsem);
4186         if (ret)
4187                 goto err_out_bus;
4188
4189         ret = rbd_req_sync_watch(rbd_dev, 1);
4190         if (ret)
4191                 goto err_out_bus;
4192
4193         /* Everything's ready.  Announce the disk to the world. */
4194
4195         add_disk(rbd_dev->disk);
4196
4197         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4198                 (unsigned long long) rbd_dev->mapping.size);
4199
4200         return ret;
4201 err_out_bus:
4202         /* this will also clean up rest of rbd_dev stuff */
4203
4204         rbd_bus_del_dev(rbd_dev);
4205
4206         return ret;
4207 err_out_disk:
4208         rbd_free_disk(rbd_dev);
4209 err_out_blkdev:
4210         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4211 err_out_id:
4212         rbd_dev_id_put(rbd_dev);
4213 err_out_snaps:
4214         rbd_remove_all_snaps(rbd_dev);
4215
4216         return ret;
4217 }
4218
4219 /*
4220  * Probe for the existence of the header object for the given rbd
4221  * device.  For format 2 images this includes determining the image
4222  * id.
4223  */
4224 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4225 {
4226         int ret;
4227
4228         /*
4229          * Get the id from the image id object.  If it's not a
4230          * format 2 image, we'll get ENOENT back, and we'll assume
4231          * it's a format 1 image.
4232          */
4233         ret = rbd_dev_image_id(rbd_dev);
4234         if (ret)
4235                 ret = rbd_dev_v1_probe(rbd_dev);
4236         else
4237                 ret = rbd_dev_v2_probe(rbd_dev);
4238         if (ret) {
4239                 dout("probe failed, returning %d\n", ret);
4240
4241                 return ret;
4242         }
4243
4244         ret = rbd_dev_probe_finish(rbd_dev);
4245         if (ret)
4246                 rbd_header_free(&rbd_dev->header);
4247
4248         return ret;
4249 }
4250
4251 static ssize_t rbd_add(struct bus_type *bus,
4252                        const char *buf,
4253                        size_t count)
4254 {
4255         struct rbd_device *rbd_dev = NULL;
4256         struct ceph_options *ceph_opts = NULL;
4257         struct rbd_options *rbd_opts = NULL;
4258         struct rbd_spec *spec = NULL;
4259         struct rbd_client *rbdc;
4260         struct ceph_osd_client *osdc;
4261         int rc = -ENOMEM;
4262
4263         if (!try_module_get(THIS_MODULE))
4264                 return -ENODEV;
4265
4266         /* parse add command */
4267         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4268         if (rc < 0)
4269                 goto err_out_module;
4270
4271         rbdc = rbd_get_client(ceph_opts);
4272         if (IS_ERR(rbdc)) {
4273                 rc = PTR_ERR(rbdc);
4274                 goto err_out_args;
4275         }
4276         ceph_opts = NULL;       /* rbd_dev client now owns this */
4277
4278         /* pick the pool */
4279         osdc = &rbdc->client->osdc;
4280         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4281         if (rc < 0)
4282                 goto err_out_client;
4283         spec->pool_id = (u64) rc;
4284
4285         /* The ceph file layout needs to fit pool id in 32 bits */
4286
4287         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4288                 rc = -EIO;
4289                 goto err_out_client;
4290         }
4291
4292         rbd_dev = rbd_dev_create(rbdc, spec);
4293         if (!rbd_dev)
4294                 goto err_out_client;
4295         rbdc = NULL;            /* rbd_dev now owns this */
4296         spec = NULL;            /* rbd_dev now owns this */
4297
4298         rbd_dev->mapping.read_only = rbd_opts->read_only;
4299         kfree(rbd_opts);
4300         rbd_opts = NULL;        /* done with this */
4301
4302         rc = rbd_dev_probe(rbd_dev);
4303         if (rc < 0)
4304                 goto err_out_rbd_dev;
4305
4306         return count;
4307 err_out_rbd_dev:
4308         rbd_dev_destroy(rbd_dev);
4309 err_out_client:
4310         rbd_put_client(rbdc);
4311 err_out_args:
4312         if (ceph_opts)
4313                 ceph_destroy_options(ceph_opts);
4314         kfree(rbd_opts);
4315         rbd_spec_put(spec);
4316 err_out_module:
4317         module_put(THIS_MODULE);
4318
4319         dout("Error adding device %s\n", buf);
4320
4321         return (ssize_t) rc;
4322 }
4323
4324 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4325 {
4326         struct list_head *tmp;
4327         struct rbd_device *rbd_dev;
4328
4329         spin_lock(&rbd_dev_list_lock);
4330         list_for_each(tmp, &rbd_dev_list) {
4331                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4332                 if (rbd_dev->dev_id == dev_id) {
4333                         spin_unlock(&rbd_dev_list_lock);
4334                         return rbd_dev;
4335                 }
4336         }
4337         spin_unlock(&rbd_dev_list_lock);
4338         return NULL;
4339 }
4340
4341 static void rbd_dev_release(struct device *dev)
4342 {
4343         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4344
4345         if (rbd_dev->watch_request) {
4346                 struct ceph_client *client = rbd_dev->rbd_client->client;
4347
4348                 ceph_osdc_unregister_linger_request(&client->osdc,
4349                                                     rbd_dev->watch_request);
4350         }
4351         if (rbd_dev->watch_event)
4352                 rbd_req_sync_watch(rbd_dev, 0);
4353
4354         /* clean up and free blkdev */
4355         rbd_free_disk(rbd_dev);
4356         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4357
4358         /* release allocated disk header fields */
4359         rbd_header_free(&rbd_dev->header);
4360
4361         /* done with the id, and with the rbd_dev */
4362         rbd_dev_id_put(rbd_dev);
4363         rbd_assert(rbd_dev->rbd_client != NULL);
4364         rbd_dev_destroy(rbd_dev);
4365
4366         /* release module ref */
4367         module_put(THIS_MODULE);
4368 }
4369
4370 static ssize_t rbd_remove(struct bus_type *bus,
4371                           const char *buf,
4372                           size_t count)
4373 {
4374         struct rbd_device *rbd_dev = NULL;
4375         int target_id, rc;
4376         unsigned long ul;
4377         int ret = count;
4378
4379         rc = strict_strtoul(buf, 10, &ul);
4380         if (rc)
4381                 return rc;
4382
4383         /* convert to int; abort if we lost anything in the conversion */
4384         target_id = (int) ul;
4385         if (target_id != ul)
4386                 return -EINVAL;
4387
4388         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4389
4390         rbd_dev = __rbd_get_dev(target_id);
4391         if (!rbd_dev) {
4392                 ret = -ENOENT;
4393                 goto done;
4394         }
4395
4396         if (rbd_dev->open_count) {
4397                 ret = -EBUSY;
4398                 goto done;
4399         }
4400
4401         rbd_remove_all_snaps(rbd_dev);
4402         rbd_bus_del_dev(rbd_dev);
4403
4404 done:
4405         mutex_unlock(&ctl_mutex);
4406
4407         return ret;
4408 }
4409
4410 /*
4411  * create control files in sysfs
4412  * /sys/bus/rbd/...
4413  */
4414 static int rbd_sysfs_init(void)
4415 {
4416         int ret;
4417
4418         ret = device_register(&rbd_root_dev);
4419         if (ret < 0)
4420                 return ret;
4421
4422         ret = bus_register(&rbd_bus_type);
4423         if (ret < 0)
4424                 device_unregister(&rbd_root_dev);
4425
4426         return ret;
4427 }
4428
4429 static void rbd_sysfs_cleanup(void)
4430 {
4431         bus_unregister(&rbd_bus_type);
4432         device_unregister(&rbd_root_dev);
4433 }
4434
4435 int __init rbd_init(void)
4436 {
4437         int rc;
4438
4439         rc = rbd_sysfs_init();
4440         if (rc)
4441                 return rc;
4442         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4443         return 0;
4444 }
4445
4446 void __exit rbd_exit(void)
4447 {
4448         rbd_sysfs_cleanup();
4449 }
4450
4451 module_init(rbd_init);
4452 module_exit(rbd_exit);
4453
4454 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4455 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4456 MODULE_DESCRIPTION("rados block device");
4457
4458 /* following authorship retained from original osdblk.c */
4459 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4460
4461 MODULE_LICENSE("GPL");