rbd: set the mapping size and features later
[firefly-linux-kernel-4.4.55.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
60
61 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
62
63 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN   \
65                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
67 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
68
69 #define RBD_SNAP_HEAD_NAME      "-"
70
71 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX    64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX  64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING    (1<<0)
82 #define RBD_FEATURE_STRIPINGV2  (1<<1)
83 #define RBD_FEATURES_ALL \
84             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
85
86 /* Features supported by this (client software) implementation. */
87
88 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
89
90 /*
91  * An RBD device name will be "rbd#", where the "rbd" comes from
92  * RBD_DRV_NAME above, and # is a unique integer identifier.
93  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94  * enough to hold all possible device names.
95  */
96 #define DEV_NAME_LEN            32
97 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
98
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103         /* These six fields never change for a given rbd image */
104         char *object_prefix;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108         u64 stripe_unit;
109         u64 stripe_count;
110         u64 features;           /* Might be changeable someday? */
111
112         /* The remaining fields need to be updated occasionally */
113         u64 image_size;
114         struct ceph_snap_context *snapc;
115         char *snap_names;       /* format 1 only */
116         u64 *snap_sizes;        /* format 1 only */
117 };
118
119 /*
120  * An rbd image specification.
121  *
122  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123  * identify an image.  Each rbd_dev structure includes a pointer to
124  * an rbd_spec structure that encapsulates this identity.
125  *
126  * Each of the id's in an rbd_spec has an associated name.  For a
127  * user-mapped image, the names are supplied and the id's associated
128  * with them are looked up.  For a layered image, a parent image is
129  * defined by the tuple, and the names are looked up.
130  *
131  * An rbd_dev structure contains a parent_spec pointer which is
132  * non-null if the image it represents is a child in a layered
133  * image.  This pointer will refer to the rbd_spec structure used
134  * by the parent rbd_dev for its own identity (i.e., the structure
135  * is shared between the parent and child).
136  *
137  * Since these structures are populated once, during the discovery
138  * phase of image construction, they are effectively immutable so
139  * we make no effort to synchronize access to them.
140  *
141  * Note that code herein does not assume the image name is known (it
142  * could be a null pointer).
143  */
144 struct rbd_spec {
145         u64             pool_id;
146         const char      *pool_name;
147
148         const char      *image_id;
149         const char      *image_name;
150
151         u64             snap_id;
152         const char      *snap_name;
153
154         struct kref     kref;
155 };
156
157 /*
158  * an instance of the client.  multiple devices may share an rbd client.
159  */
160 struct rbd_client {
161         struct ceph_client      *client;
162         struct kref             kref;
163         struct list_head        node;
164 };
165
166 struct rbd_img_request;
167 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168
169 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
170
171 struct rbd_obj_request;
172 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173
174 enum obj_request_type {
175         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
176 };
177
178 enum obj_req_flags {
179         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
180         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
181         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
182         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
183 };
184
185 struct rbd_obj_request {
186         const char              *object_name;
187         u64                     offset;         /* object start byte */
188         u64                     length;         /* bytes from offset */
189         unsigned long           flags;
190
191         /*
192          * An object request associated with an image will have its
193          * img_data flag set; a standalone object request will not.
194          *
195          * A standalone object request will have which == BAD_WHICH
196          * and a null obj_request pointer.
197          *
198          * An object request initiated in support of a layered image
199          * object (to check for its existence before a write) will
200          * have which == BAD_WHICH and a non-null obj_request pointer.
201          *
202          * Finally, an object request for rbd image data will have
203          * which != BAD_WHICH, and will have a non-null img_request
204          * pointer.  The value of which will be in the range
205          * 0..(img_request->obj_request_count-1).
206          */
207         union {
208                 struct rbd_obj_request  *obj_request;   /* STAT op */
209                 struct {
210                         struct rbd_img_request  *img_request;
211                         u64                     img_offset;
212                         /* links for img_request->obj_requests list */
213                         struct list_head        links;
214                 };
215         };
216         u32                     which;          /* posn image request list */
217
218         enum obj_request_type   type;
219         union {
220                 struct bio      *bio_list;
221                 struct {
222                         struct page     **pages;
223                         u32             page_count;
224                 };
225         };
226         struct page             **copyup_pages;
227
228         struct ceph_osd_request *osd_req;
229
230         u64                     xferred;        /* bytes transferred */
231         int                     result;
232
233         rbd_obj_callback_t      callback;
234         struct completion       completion;
235
236         struct kref             kref;
237 };
238
239 enum img_req_flags {
240         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
241         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
242         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
243 };
244
245 struct rbd_img_request {
246         struct rbd_device       *rbd_dev;
247         u64                     offset; /* starting image byte offset */
248         u64                     length; /* byte count from offset */
249         unsigned long           flags;
250         union {
251                 u64                     snap_id;        /* for reads */
252                 struct ceph_snap_context *snapc;        /* for writes */
253         };
254         union {
255                 struct request          *rq;            /* block request */
256                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
257         };
258         struct page             **copyup_pages;
259         spinlock_t              completion_lock;/* protects next_completion */
260         u32                     next_completion;
261         rbd_img_callback_t      callback;
262         u64                     xferred;/* aggregate bytes transferred */
263         int                     result; /* first nonzero obj_request result */
264
265         u32                     obj_request_count;
266         struct list_head        obj_requests;   /* rbd_obj_request structs */
267
268         struct kref             kref;
269 };
270
271 #define for_each_obj_request(ireq, oreq) \
272         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
273 #define for_each_obj_request_from(ireq, oreq) \
274         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
275 #define for_each_obj_request_safe(ireq, oreq, n) \
276         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
277
278 struct rbd_mapping {
279         u64                     size;
280         u64                     features;
281         bool                    read_only;
282 };
283
284 /*
285  * a single device
286  */
287 struct rbd_device {
288         int                     dev_id;         /* blkdev unique id */
289
290         int                     major;          /* blkdev assigned major */
291         struct gendisk          *disk;          /* blkdev's gendisk and rq */
292
293         u32                     image_format;   /* Either 1 or 2 */
294         struct rbd_client       *rbd_client;
295
296         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
297
298         spinlock_t              lock;           /* queue, flags, open_count */
299
300         struct rbd_image_header header;
301         unsigned long           flags;          /* possibly lock protected */
302         struct rbd_spec         *spec;
303
304         char                    *header_name;
305
306         struct ceph_file_layout layout;
307
308         struct ceph_osd_event   *watch_event;
309         struct rbd_obj_request  *watch_request;
310
311         struct rbd_spec         *parent_spec;
312         u64                     parent_overlap;
313         struct rbd_device       *parent;
314
315         /* protects updating the header */
316         struct rw_semaphore     header_rwsem;
317
318         struct rbd_mapping      mapping;
319
320         struct list_head        node;
321
322         /* sysfs related */
323         struct device           dev;
324         unsigned long           open_count;     /* protected by lock */
325 };
326
327 /*
328  * Flag bits for rbd_dev->flags.  If atomicity is required,
329  * rbd_dev->lock is used to protect access.
330  *
331  * Currently, only the "removing" flag (which is coupled with the
332  * "open_count" field) requires atomic access.
333  */
334 enum rbd_dev_flags {
335         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
336         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
337 };
338
339 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
340
341 static LIST_HEAD(rbd_dev_list);    /* devices */
342 static DEFINE_SPINLOCK(rbd_dev_list_lock);
343
344 static LIST_HEAD(rbd_client_list);              /* clients */
345 static DEFINE_SPINLOCK(rbd_client_list_lock);
346
347 /* Slab caches for frequently-allocated structures */
348
349 static struct kmem_cache        *rbd_img_request_cache;
350 static struct kmem_cache        *rbd_obj_request_cache;
351 static struct kmem_cache        *rbd_segment_name_cache;
352
353 static int rbd_img_request_submit(struct rbd_img_request *img_request);
354
355 static void rbd_dev_device_release(struct device *dev);
356
357 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
358                        size_t count);
359 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
360                           size_t count);
361 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only);
362
363 static struct bus_attribute rbd_bus_attrs[] = {
364         __ATTR(add, S_IWUSR, NULL, rbd_add),
365         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
366         __ATTR_NULL
367 };
368
369 static struct bus_type rbd_bus_type = {
370         .name           = "rbd",
371         .bus_attrs      = rbd_bus_attrs,
372 };
373
374 static void rbd_root_dev_release(struct device *dev)
375 {
376 }
377
378 static struct device rbd_root_dev = {
379         .init_name =    "rbd",
380         .release =      rbd_root_dev_release,
381 };
382
383 static __printf(2, 3)
384 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
385 {
386         struct va_format vaf;
387         va_list args;
388
389         va_start(args, fmt);
390         vaf.fmt = fmt;
391         vaf.va = &args;
392
393         if (!rbd_dev)
394                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
395         else if (rbd_dev->disk)
396                 printk(KERN_WARNING "%s: %s: %pV\n",
397                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
398         else if (rbd_dev->spec && rbd_dev->spec->image_name)
399                 printk(KERN_WARNING "%s: image %s: %pV\n",
400                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
401         else if (rbd_dev->spec && rbd_dev->spec->image_id)
402                 printk(KERN_WARNING "%s: id %s: %pV\n",
403                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
404         else    /* punt */
405                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
406                         RBD_DRV_NAME, rbd_dev, &vaf);
407         va_end(args);
408 }
409
410 #ifdef RBD_DEBUG
411 #define rbd_assert(expr)                                                \
412                 if (unlikely(!(expr))) {                                \
413                         printk(KERN_ERR "\nAssertion failure in %s() "  \
414                                                 "at line %d:\n\n"       \
415                                         "\trbd_assert(%s);\n\n",        \
416                                         __func__, __LINE__, #expr);     \
417                         BUG();                                          \
418                 }
419 #else /* !RBD_DEBUG */
420 #  define rbd_assert(expr)      ((void) 0)
421 #endif /* !RBD_DEBUG */
422
423 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
424 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
425 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
426
427 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
428 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
429 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
430                                         u64 snap_id);
431 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
432                                 u8 *order, u64 *snap_size);
433 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
434                 u64 *snap_features);
435 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
436
437 static int rbd_open(struct block_device *bdev, fmode_t mode)
438 {
439         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
440         bool removing = false;
441
442         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
443                 return -EROFS;
444
445         spin_lock_irq(&rbd_dev->lock);
446         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
447                 removing = true;
448         else
449                 rbd_dev->open_count++;
450         spin_unlock_irq(&rbd_dev->lock);
451         if (removing)
452                 return -ENOENT;
453
454         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455         (void) get_device(&rbd_dev->dev);
456         set_device_ro(bdev, rbd_dev->mapping.read_only);
457         mutex_unlock(&ctl_mutex);
458
459         return 0;
460 }
461
462 static int rbd_release(struct gendisk *disk, fmode_t mode)
463 {
464         struct rbd_device *rbd_dev = disk->private_data;
465         unsigned long open_count_before;
466
467         spin_lock_irq(&rbd_dev->lock);
468         open_count_before = rbd_dev->open_count--;
469         spin_unlock_irq(&rbd_dev->lock);
470         rbd_assert(open_count_before > 0);
471
472         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
473         put_device(&rbd_dev->dev);
474         mutex_unlock(&ctl_mutex);
475
476         return 0;
477 }
478
479 static const struct block_device_operations rbd_bd_ops = {
480         .owner                  = THIS_MODULE,
481         .open                   = rbd_open,
482         .release                = rbd_release,
483 };
484
485 /*
486  * Initialize an rbd client instance.
487  * We own *ceph_opts.
488  */
489 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
490 {
491         struct rbd_client *rbdc;
492         int ret = -ENOMEM;
493
494         dout("%s:\n", __func__);
495         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
496         if (!rbdc)
497                 goto out_opt;
498
499         kref_init(&rbdc->kref);
500         INIT_LIST_HEAD(&rbdc->node);
501
502         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
503
504         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
505         if (IS_ERR(rbdc->client))
506                 goto out_mutex;
507         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
508
509         ret = ceph_open_session(rbdc->client);
510         if (ret < 0)
511                 goto out_err;
512
513         spin_lock(&rbd_client_list_lock);
514         list_add_tail(&rbdc->node, &rbd_client_list);
515         spin_unlock(&rbd_client_list_lock);
516
517         mutex_unlock(&ctl_mutex);
518         dout("%s: rbdc %p\n", __func__, rbdc);
519
520         return rbdc;
521
522 out_err:
523         ceph_destroy_client(rbdc->client);
524 out_mutex:
525         mutex_unlock(&ctl_mutex);
526         kfree(rbdc);
527 out_opt:
528         if (ceph_opts)
529                 ceph_destroy_options(ceph_opts);
530         dout("%s: error %d\n", __func__, ret);
531
532         return ERR_PTR(ret);
533 }
534
535 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
536 {
537         kref_get(&rbdc->kref);
538
539         return rbdc;
540 }
541
542 /*
543  * Find a ceph client with specific addr and configuration.  If
544  * found, bump its reference count.
545  */
546 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
547 {
548         struct rbd_client *client_node;
549         bool found = false;
550
551         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
552                 return NULL;
553
554         spin_lock(&rbd_client_list_lock);
555         list_for_each_entry(client_node, &rbd_client_list, node) {
556                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
557                         __rbd_get_client(client_node);
558
559                         found = true;
560                         break;
561                 }
562         }
563         spin_unlock(&rbd_client_list_lock);
564
565         return found ? client_node : NULL;
566 }
567
568 /*
569  * mount options
570  */
571 enum {
572         Opt_last_int,
573         /* int args above */
574         Opt_last_string,
575         /* string args above */
576         Opt_read_only,
577         Opt_read_write,
578         /* Boolean args above */
579         Opt_last_bool,
580 };
581
582 static match_table_t rbd_opts_tokens = {
583         /* int args above */
584         /* string args above */
585         {Opt_read_only, "read_only"},
586         {Opt_read_only, "ro"},          /* Alternate spelling */
587         {Opt_read_write, "read_write"},
588         {Opt_read_write, "rw"},         /* Alternate spelling */
589         /* Boolean args above */
590         {-1, NULL}
591 };
592
593 struct rbd_options {
594         bool    read_only;
595 };
596
597 #define RBD_READ_ONLY_DEFAULT   false
598
599 static int parse_rbd_opts_token(char *c, void *private)
600 {
601         struct rbd_options *rbd_opts = private;
602         substring_t argstr[MAX_OPT_ARGS];
603         int token, intval, ret;
604
605         token = match_token(c, rbd_opts_tokens, argstr);
606         if (token < 0)
607                 return -EINVAL;
608
609         if (token < Opt_last_int) {
610                 ret = match_int(&argstr[0], &intval);
611                 if (ret < 0) {
612                         pr_err("bad mount option arg (not int) "
613                                "at '%s'\n", c);
614                         return ret;
615                 }
616                 dout("got int token %d val %d\n", token, intval);
617         } else if (token > Opt_last_int && token < Opt_last_string) {
618                 dout("got string token %d val %s\n", token,
619                      argstr[0].from);
620         } else if (token > Opt_last_string && token < Opt_last_bool) {
621                 dout("got Boolean token %d\n", token);
622         } else {
623                 dout("got token %d\n", token);
624         }
625
626         switch (token) {
627         case Opt_read_only:
628                 rbd_opts->read_only = true;
629                 break;
630         case Opt_read_write:
631                 rbd_opts->read_only = false;
632                 break;
633         default:
634                 rbd_assert(false);
635                 break;
636         }
637         return 0;
638 }
639
640 /*
641  * Get a ceph client with specific addr and configuration, if one does
642  * not exist create it.
643  */
644 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
645 {
646         struct rbd_client *rbdc;
647
648         rbdc = rbd_client_find(ceph_opts);
649         if (rbdc)       /* using an existing client */
650                 ceph_destroy_options(ceph_opts);
651         else
652                 rbdc = rbd_client_create(ceph_opts);
653
654         return rbdc;
655 }
656
657 /*
658  * Destroy ceph client
659  *
660  * Caller must hold rbd_client_list_lock.
661  */
662 static void rbd_client_release(struct kref *kref)
663 {
664         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
665
666         dout("%s: rbdc %p\n", __func__, rbdc);
667         spin_lock(&rbd_client_list_lock);
668         list_del(&rbdc->node);
669         spin_unlock(&rbd_client_list_lock);
670
671         ceph_destroy_client(rbdc->client);
672         kfree(rbdc);
673 }
674
675 /*
676  * Drop reference to ceph client node. If it's not referenced anymore, release
677  * it.
678  */
679 static void rbd_put_client(struct rbd_client *rbdc)
680 {
681         if (rbdc)
682                 kref_put(&rbdc->kref, rbd_client_release);
683 }
684
685 static bool rbd_image_format_valid(u32 image_format)
686 {
687         return image_format == 1 || image_format == 2;
688 }
689
690 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
691 {
692         size_t size;
693         u32 snap_count;
694
695         /* The header has to start with the magic rbd header text */
696         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
697                 return false;
698
699         /* The bio layer requires at least sector-sized I/O */
700
701         if (ondisk->options.order < SECTOR_SHIFT)
702                 return false;
703
704         /* If we use u64 in a few spots we may be able to loosen this */
705
706         if (ondisk->options.order > 8 * sizeof (int) - 1)
707                 return false;
708
709         /*
710          * The size of a snapshot header has to fit in a size_t, and
711          * that limits the number of snapshots.
712          */
713         snap_count = le32_to_cpu(ondisk->snap_count);
714         size = SIZE_MAX - sizeof (struct ceph_snap_context);
715         if (snap_count > size / sizeof (__le64))
716                 return false;
717
718         /*
719          * Not only that, but the size of the entire the snapshot
720          * header must also be representable in a size_t.
721          */
722         size -= snap_count * sizeof (__le64);
723         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
724                 return false;
725
726         return true;
727 }
728
729 /*
730  * Create a new header structure, translate header format from the on-disk
731  * header.
732  */
733 static int rbd_header_from_disk(struct rbd_image_header *header,
734                                  struct rbd_image_header_ondisk *ondisk)
735 {
736         u32 snap_count;
737         size_t len;
738         size_t size;
739         u32 i;
740
741         memset(header, 0, sizeof (*header));
742
743         snap_count = le32_to_cpu(ondisk->snap_count);
744
745         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
746         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
747         if (!header->object_prefix)
748                 return -ENOMEM;
749         memcpy(header->object_prefix, ondisk->object_prefix, len);
750         header->object_prefix[len] = '\0';
751
752         if (snap_count) {
753                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
754
755                 /* Save a copy of the snapshot names */
756
757                 if (snap_names_len > (u64) SIZE_MAX)
758                         return -EIO;
759                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
760                 if (!header->snap_names)
761                         goto out_err;
762                 /*
763                  * Note that rbd_dev_v1_header_read() guarantees
764                  * the ondisk buffer we're working with has
765                  * snap_names_len bytes beyond the end of the
766                  * snapshot id array, this memcpy() is safe.
767                  */
768                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
769                         snap_names_len);
770
771                 /* Record each snapshot's size */
772
773                 size = snap_count * sizeof (*header->snap_sizes);
774                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
775                 if (!header->snap_sizes)
776                         goto out_err;
777                 for (i = 0; i < snap_count; i++)
778                         header->snap_sizes[i] =
779                                 le64_to_cpu(ondisk->snaps[i].image_size);
780         } else {
781                 header->snap_names = NULL;
782                 header->snap_sizes = NULL;
783         }
784
785         header->features = 0;   /* No features support in v1 images */
786         header->obj_order = ondisk->options.order;
787         header->crypt_type = ondisk->options.crypt_type;
788         header->comp_type = ondisk->options.comp_type;
789
790         /* Allocate and fill in the snapshot context */
791
792         header->image_size = le64_to_cpu(ondisk->image_size);
793
794         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
795         if (!header->snapc)
796                 goto out_err;
797         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
798         for (i = 0; i < snap_count; i++)
799                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
800
801         return 0;
802
803 out_err:
804         kfree(header->snap_sizes);
805         header->snap_sizes = NULL;
806         kfree(header->snap_names);
807         header->snap_names = NULL;
808         kfree(header->object_prefix);
809         header->object_prefix = NULL;
810
811         return -ENOMEM;
812 }
813
814 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
815 {
816         const char *snap_name;
817
818         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
819
820         /* Skip over names until we find the one we are looking for */
821
822         snap_name = rbd_dev->header.snap_names;
823         while (which--)
824                 snap_name += strlen(snap_name) + 1;
825
826         return kstrdup(snap_name, GFP_KERNEL);
827 }
828
829 /*
830  * Snapshot id comparison function for use with qsort()/bsearch().
831  * Note that result is for snapshots in *descending* order.
832  */
833 static int snapid_compare_reverse(const void *s1, const void *s2)
834 {
835         u64 snap_id1 = *(u64 *)s1;
836         u64 snap_id2 = *(u64 *)s2;
837
838         if (snap_id1 < snap_id2)
839                 return 1;
840         return snap_id1 == snap_id2 ? 0 : -1;
841 }
842
843 /*
844  * Search a snapshot context to see if the given snapshot id is
845  * present.
846  *
847  * Returns the position of the snapshot id in the array if it's found,
848  * or BAD_SNAP_INDEX otherwise.
849  *
850  * Note: The snapshot array is in kept sorted (by the osd) in
851  * reverse order, highest snapshot id first.
852  */
853 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
854 {
855         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
856         u64 *found;
857
858         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
859                                 sizeof (snap_id), snapid_compare_reverse);
860
861         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
862 }
863
864 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
865                                         u64 snap_id)
866 {
867         u32 which;
868
869         which = rbd_dev_snap_index(rbd_dev, snap_id);
870         if (which == BAD_SNAP_INDEX)
871                 return NULL;
872
873         return _rbd_dev_v1_snap_name(rbd_dev, which);
874 }
875
876 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
877 {
878         if (snap_id == CEPH_NOSNAP)
879                 return RBD_SNAP_HEAD_NAME;
880
881         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
882         if (rbd_dev->image_format == 1)
883                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
884
885         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
886 }
887
888 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
889                                 u64 *snap_size)
890 {
891         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
892         if (snap_id == CEPH_NOSNAP) {
893                 *snap_size = rbd_dev->header.image_size;
894         } else if (rbd_dev->image_format == 1) {
895                 u32 which;
896
897                 which = rbd_dev_snap_index(rbd_dev, snap_id);
898                 if (which == BAD_SNAP_INDEX)
899                         return -ENOENT;
900
901                 *snap_size = rbd_dev->header.snap_sizes[which];
902         } else {
903                 u64 size = 0;
904                 int ret;
905
906                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
907                 if (ret)
908                         return ret;
909
910                 *snap_size = size;
911         }
912         return 0;
913 }
914
915 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
916                         u64 *snap_features)
917 {
918         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
919         if (snap_id == CEPH_NOSNAP) {
920                 *snap_features = rbd_dev->header.features;
921         } else if (rbd_dev->image_format == 1) {
922                 *snap_features = 0;     /* No features for format 1 */
923         } else {
924                 u64 features = 0;
925                 int ret;
926
927                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
928                 if (ret)
929                         return ret;
930
931                 *snap_features = features;
932         }
933         return 0;
934 }
935
936 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
937 {
938         u64 snap_id = rbd_dev->spec->snap_id;
939         u64 size = 0;
940         u64 features = 0;
941         int ret;
942
943         ret = rbd_snap_size(rbd_dev, snap_id, &size);
944         if (ret)
945                 return ret;
946         ret = rbd_snap_features(rbd_dev, snap_id, &features);
947         if (ret)
948                 return ret;
949
950         rbd_dev->mapping.size = size;
951         rbd_dev->mapping.features = features;
952
953         return 0;
954 }
955
956 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
957 {
958         rbd_dev->mapping.size = 0;
959         rbd_dev->mapping.features = 0;
960 }
961
962 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
963 {
964         char *name;
965         u64 segment;
966         int ret;
967
968         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
969         if (!name)
970                 return NULL;
971         segment = offset >> rbd_dev->header.obj_order;
972         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
973                         rbd_dev->header.object_prefix, segment);
974         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
975                 pr_err("error formatting segment name for #%llu (%d)\n",
976                         segment, ret);
977                 kfree(name);
978                 name = NULL;
979         }
980
981         return name;
982 }
983
984 static void rbd_segment_name_free(const char *name)
985 {
986         /* The explicit cast here is needed to drop the const qualifier */
987
988         kmem_cache_free(rbd_segment_name_cache, (void *)name);
989 }
990
991 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
992 {
993         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
994
995         return offset & (segment_size - 1);
996 }
997
998 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
999                                 u64 offset, u64 length)
1000 {
1001         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1002
1003         offset &= segment_size - 1;
1004
1005         rbd_assert(length <= U64_MAX - offset);
1006         if (offset + length > segment_size)
1007                 length = segment_size - offset;
1008
1009         return length;
1010 }
1011
1012 /*
1013  * returns the size of an object in the image
1014  */
1015 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1016 {
1017         return 1 << header->obj_order;
1018 }
1019
1020 /*
1021  * bio helpers
1022  */
1023
1024 static void bio_chain_put(struct bio *chain)
1025 {
1026         struct bio *tmp;
1027
1028         while (chain) {
1029                 tmp = chain;
1030                 chain = chain->bi_next;
1031                 bio_put(tmp);
1032         }
1033 }
1034
1035 /*
1036  * zeros a bio chain, starting at specific offset
1037  */
1038 static void zero_bio_chain(struct bio *chain, int start_ofs)
1039 {
1040         struct bio_vec *bv;
1041         unsigned long flags;
1042         void *buf;
1043         int i;
1044         int pos = 0;
1045
1046         while (chain) {
1047                 bio_for_each_segment(bv, chain, i) {
1048                         if (pos + bv->bv_len > start_ofs) {
1049                                 int remainder = max(start_ofs - pos, 0);
1050                                 buf = bvec_kmap_irq(bv, &flags);
1051                                 memset(buf + remainder, 0,
1052                                        bv->bv_len - remainder);
1053                                 bvec_kunmap_irq(buf, &flags);
1054                         }
1055                         pos += bv->bv_len;
1056                 }
1057
1058                 chain = chain->bi_next;
1059         }
1060 }
1061
1062 /*
1063  * similar to zero_bio_chain(), zeros data defined by a page array,
1064  * starting at the given byte offset from the start of the array and
1065  * continuing up to the given end offset.  The pages array is
1066  * assumed to be big enough to hold all bytes up to the end.
1067  */
1068 static void zero_pages(struct page **pages, u64 offset, u64 end)
1069 {
1070         struct page **page = &pages[offset >> PAGE_SHIFT];
1071
1072         rbd_assert(end > offset);
1073         rbd_assert(end - offset <= (u64)SIZE_MAX);
1074         while (offset < end) {
1075                 size_t page_offset;
1076                 size_t length;
1077                 unsigned long flags;
1078                 void *kaddr;
1079
1080                 page_offset = (size_t)(offset & ~PAGE_MASK);
1081                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1082                 local_irq_save(flags);
1083                 kaddr = kmap_atomic(*page);
1084                 memset(kaddr + page_offset, 0, length);
1085                 kunmap_atomic(kaddr);
1086                 local_irq_restore(flags);
1087
1088                 offset += length;
1089                 page++;
1090         }
1091 }
1092
1093 /*
1094  * Clone a portion of a bio, starting at the given byte offset
1095  * and continuing for the number of bytes indicated.
1096  */
1097 static struct bio *bio_clone_range(struct bio *bio_src,
1098                                         unsigned int offset,
1099                                         unsigned int len,
1100                                         gfp_t gfpmask)
1101 {
1102         struct bio_vec *bv;
1103         unsigned int resid;
1104         unsigned short idx;
1105         unsigned int voff;
1106         unsigned short end_idx;
1107         unsigned short vcnt;
1108         struct bio *bio;
1109
1110         /* Handle the easy case for the caller */
1111
1112         if (!offset && len == bio_src->bi_size)
1113                 return bio_clone(bio_src, gfpmask);
1114
1115         if (WARN_ON_ONCE(!len))
1116                 return NULL;
1117         if (WARN_ON_ONCE(len > bio_src->bi_size))
1118                 return NULL;
1119         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1120                 return NULL;
1121
1122         /* Find first affected segment... */
1123
1124         resid = offset;
1125         __bio_for_each_segment(bv, bio_src, idx, 0) {
1126                 if (resid < bv->bv_len)
1127                         break;
1128                 resid -= bv->bv_len;
1129         }
1130         voff = resid;
1131
1132         /* ...and the last affected segment */
1133
1134         resid += len;
1135         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1136                 if (resid <= bv->bv_len)
1137                         break;
1138                 resid -= bv->bv_len;
1139         }
1140         vcnt = end_idx - idx + 1;
1141
1142         /* Build the clone */
1143
1144         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1145         if (!bio)
1146                 return NULL;    /* ENOMEM */
1147
1148         bio->bi_bdev = bio_src->bi_bdev;
1149         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1150         bio->bi_rw = bio_src->bi_rw;
1151         bio->bi_flags |= 1 << BIO_CLONED;
1152
1153         /*
1154          * Copy over our part of the bio_vec, then update the first
1155          * and last (or only) entries.
1156          */
1157         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1158                         vcnt * sizeof (struct bio_vec));
1159         bio->bi_io_vec[0].bv_offset += voff;
1160         if (vcnt > 1) {
1161                 bio->bi_io_vec[0].bv_len -= voff;
1162                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1163         } else {
1164                 bio->bi_io_vec[0].bv_len = len;
1165         }
1166
1167         bio->bi_vcnt = vcnt;
1168         bio->bi_size = len;
1169         bio->bi_idx = 0;
1170
1171         return bio;
1172 }
1173
1174 /*
1175  * Clone a portion of a bio chain, starting at the given byte offset
1176  * into the first bio in the source chain and continuing for the
1177  * number of bytes indicated.  The result is another bio chain of
1178  * exactly the given length, or a null pointer on error.
1179  *
1180  * The bio_src and offset parameters are both in-out.  On entry they
1181  * refer to the first source bio and the offset into that bio where
1182  * the start of data to be cloned is located.
1183  *
1184  * On return, bio_src is updated to refer to the bio in the source
1185  * chain that contains first un-cloned byte, and *offset will
1186  * contain the offset of that byte within that bio.
1187  */
1188 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1189                                         unsigned int *offset,
1190                                         unsigned int len,
1191                                         gfp_t gfpmask)
1192 {
1193         struct bio *bi = *bio_src;
1194         unsigned int off = *offset;
1195         struct bio *chain = NULL;
1196         struct bio **end;
1197
1198         /* Build up a chain of clone bios up to the limit */
1199
1200         if (!bi || off >= bi->bi_size || !len)
1201                 return NULL;            /* Nothing to clone */
1202
1203         end = &chain;
1204         while (len) {
1205                 unsigned int bi_size;
1206                 struct bio *bio;
1207
1208                 if (!bi) {
1209                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1210                         goto out_err;   /* EINVAL; ran out of bio's */
1211                 }
1212                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1213                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1214                 if (!bio)
1215                         goto out_err;   /* ENOMEM */
1216
1217                 *end = bio;
1218                 end = &bio->bi_next;
1219
1220                 off += bi_size;
1221                 if (off == bi->bi_size) {
1222                         bi = bi->bi_next;
1223                         off = 0;
1224                 }
1225                 len -= bi_size;
1226         }
1227         *bio_src = bi;
1228         *offset = off;
1229
1230         return chain;
1231 out_err:
1232         bio_chain_put(chain);
1233
1234         return NULL;
1235 }
1236
1237 /*
1238  * The default/initial value for all object request flags is 0.  For
1239  * each flag, once its value is set to 1 it is never reset to 0
1240  * again.
1241  */
1242 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1243 {
1244         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1245                 struct rbd_device *rbd_dev;
1246
1247                 rbd_dev = obj_request->img_request->rbd_dev;
1248                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1249                         obj_request);
1250         }
1251 }
1252
1253 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1254 {
1255         smp_mb();
1256         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1257 }
1258
1259 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1260 {
1261         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1262                 struct rbd_device *rbd_dev = NULL;
1263
1264                 if (obj_request_img_data_test(obj_request))
1265                         rbd_dev = obj_request->img_request->rbd_dev;
1266                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1267                         obj_request);
1268         }
1269 }
1270
1271 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1272 {
1273         smp_mb();
1274         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1275 }
1276
1277 /*
1278  * This sets the KNOWN flag after (possibly) setting the EXISTS
1279  * flag.  The latter is set based on the "exists" value provided.
1280  *
1281  * Note that for our purposes once an object exists it never goes
1282  * away again.  It's possible that the response from two existence
1283  * checks are separated by the creation of the target object, and
1284  * the first ("doesn't exist") response arrives *after* the second
1285  * ("does exist").  In that case we ignore the second one.
1286  */
1287 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1288                                 bool exists)
1289 {
1290         if (exists)
1291                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1292         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1293         smp_mb();
1294 }
1295
1296 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1297 {
1298         smp_mb();
1299         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1300 }
1301
1302 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1303 {
1304         smp_mb();
1305         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1306 }
1307
1308 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1309 {
1310         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1311                 atomic_read(&obj_request->kref.refcount));
1312         kref_get(&obj_request->kref);
1313 }
1314
1315 static void rbd_obj_request_destroy(struct kref *kref);
1316 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1317 {
1318         rbd_assert(obj_request != NULL);
1319         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1320                 atomic_read(&obj_request->kref.refcount));
1321         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1322 }
1323
1324 static void rbd_img_request_get(struct rbd_img_request *img_request)
1325 {
1326         dout("%s: img %p (was %d)\n", __func__, img_request,
1327                 atomic_read(&img_request->kref.refcount));
1328         kref_get(&img_request->kref);
1329 }
1330
1331 static void rbd_img_request_destroy(struct kref *kref);
1332 static void rbd_img_request_put(struct rbd_img_request *img_request)
1333 {
1334         rbd_assert(img_request != NULL);
1335         dout("%s: img %p (was %d)\n", __func__, img_request,
1336                 atomic_read(&img_request->kref.refcount));
1337         kref_put(&img_request->kref, rbd_img_request_destroy);
1338 }
1339
1340 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1341                                         struct rbd_obj_request *obj_request)
1342 {
1343         rbd_assert(obj_request->img_request == NULL);
1344
1345         /* Image request now owns object's original reference */
1346         obj_request->img_request = img_request;
1347         obj_request->which = img_request->obj_request_count;
1348         rbd_assert(!obj_request_img_data_test(obj_request));
1349         obj_request_img_data_set(obj_request);
1350         rbd_assert(obj_request->which != BAD_WHICH);
1351         img_request->obj_request_count++;
1352         list_add_tail(&obj_request->links, &img_request->obj_requests);
1353         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1354                 obj_request->which);
1355 }
1356
1357 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1358                                         struct rbd_obj_request *obj_request)
1359 {
1360         rbd_assert(obj_request->which != BAD_WHICH);
1361
1362         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1363                 obj_request->which);
1364         list_del(&obj_request->links);
1365         rbd_assert(img_request->obj_request_count > 0);
1366         img_request->obj_request_count--;
1367         rbd_assert(obj_request->which == img_request->obj_request_count);
1368         obj_request->which = BAD_WHICH;
1369         rbd_assert(obj_request_img_data_test(obj_request));
1370         rbd_assert(obj_request->img_request == img_request);
1371         obj_request->img_request = NULL;
1372         obj_request->callback = NULL;
1373         rbd_obj_request_put(obj_request);
1374 }
1375
1376 static bool obj_request_type_valid(enum obj_request_type type)
1377 {
1378         switch (type) {
1379         case OBJ_REQUEST_NODATA:
1380         case OBJ_REQUEST_BIO:
1381         case OBJ_REQUEST_PAGES:
1382                 return true;
1383         default:
1384                 return false;
1385         }
1386 }
1387
1388 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1389                                 struct rbd_obj_request *obj_request)
1390 {
1391         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1392
1393         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1394 }
1395
1396 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1397 {
1398
1399         dout("%s: img %p\n", __func__, img_request);
1400
1401         /*
1402          * If no error occurred, compute the aggregate transfer
1403          * count for the image request.  We could instead use
1404          * atomic64_cmpxchg() to update it as each object request
1405          * completes; not clear which way is better off hand.
1406          */
1407         if (!img_request->result) {
1408                 struct rbd_obj_request *obj_request;
1409                 u64 xferred = 0;
1410
1411                 for_each_obj_request(img_request, obj_request)
1412                         xferred += obj_request->xferred;
1413                 img_request->xferred = xferred;
1414         }
1415
1416         if (img_request->callback)
1417                 img_request->callback(img_request);
1418         else
1419                 rbd_img_request_put(img_request);
1420 }
1421
1422 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1423
1424 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1425 {
1426         dout("%s: obj %p\n", __func__, obj_request);
1427
1428         return wait_for_completion_interruptible(&obj_request->completion);
1429 }
1430
1431 /*
1432  * The default/initial value for all image request flags is 0.  Each
1433  * is conditionally set to 1 at image request initialization time
1434  * and currently never change thereafter.
1435  */
1436 static void img_request_write_set(struct rbd_img_request *img_request)
1437 {
1438         set_bit(IMG_REQ_WRITE, &img_request->flags);
1439         smp_mb();
1440 }
1441
1442 static bool img_request_write_test(struct rbd_img_request *img_request)
1443 {
1444         smp_mb();
1445         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1446 }
1447
1448 static void img_request_child_set(struct rbd_img_request *img_request)
1449 {
1450         set_bit(IMG_REQ_CHILD, &img_request->flags);
1451         smp_mb();
1452 }
1453
1454 static bool img_request_child_test(struct rbd_img_request *img_request)
1455 {
1456         smp_mb();
1457         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1458 }
1459
1460 static void img_request_layered_set(struct rbd_img_request *img_request)
1461 {
1462         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1463         smp_mb();
1464 }
1465
1466 static bool img_request_layered_test(struct rbd_img_request *img_request)
1467 {
1468         smp_mb();
1469         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1470 }
1471
1472 static void
1473 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1474 {
1475         u64 xferred = obj_request->xferred;
1476         u64 length = obj_request->length;
1477
1478         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1479                 obj_request, obj_request->img_request, obj_request->result,
1480                 xferred, length);
1481         /*
1482          * ENOENT means a hole in the image.  We zero-fill the
1483          * entire length of the request.  A short read also implies
1484          * zero-fill to the end of the request.  Either way we
1485          * update the xferred count to indicate the whole request
1486          * was satisfied.
1487          */
1488         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1489         if (obj_request->result == -ENOENT) {
1490                 if (obj_request->type == OBJ_REQUEST_BIO)
1491                         zero_bio_chain(obj_request->bio_list, 0);
1492                 else
1493                         zero_pages(obj_request->pages, 0, length);
1494                 obj_request->result = 0;
1495                 obj_request->xferred = length;
1496         } else if (xferred < length && !obj_request->result) {
1497                 if (obj_request->type == OBJ_REQUEST_BIO)
1498                         zero_bio_chain(obj_request->bio_list, xferred);
1499                 else
1500                         zero_pages(obj_request->pages, xferred, length);
1501                 obj_request->xferred = length;
1502         }
1503         obj_request_done_set(obj_request);
1504 }
1505
1506 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1507 {
1508         dout("%s: obj %p cb %p\n", __func__, obj_request,
1509                 obj_request->callback);
1510         if (obj_request->callback)
1511                 obj_request->callback(obj_request);
1512         else
1513                 complete_all(&obj_request->completion);
1514 }
1515
1516 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1517 {
1518         dout("%s: obj %p\n", __func__, obj_request);
1519         obj_request_done_set(obj_request);
1520 }
1521
1522 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1523 {
1524         struct rbd_img_request *img_request = NULL;
1525         struct rbd_device *rbd_dev = NULL;
1526         bool layered = false;
1527
1528         if (obj_request_img_data_test(obj_request)) {
1529                 img_request = obj_request->img_request;
1530                 layered = img_request && img_request_layered_test(img_request);
1531                 rbd_dev = img_request->rbd_dev;
1532         }
1533
1534         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1535                 obj_request, img_request, obj_request->result,
1536                 obj_request->xferred, obj_request->length);
1537         if (layered && obj_request->result == -ENOENT &&
1538                         obj_request->img_offset < rbd_dev->parent_overlap)
1539                 rbd_img_parent_read(obj_request);
1540         else if (img_request)
1541                 rbd_img_obj_request_read_callback(obj_request);
1542         else
1543                 obj_request_done_set(obj_request);
1544 }
1545
1546 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1547 {
1548         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1549                 obj_request->result, obj_request->length);
1550         /*
1551          * There is no such thing as a successful short write.  Set
1552          * it to our originally-requested length.
1553          */
1554         obj_request->xferred = obj_request->length;
1555         obj_request_done_set(obj_request);
1556 }
1557
1558 /*
1559  * For a simple stat call there's nothing to do.  We'll do more if
1560  * this is part of a write sequence for a layered image.
1561  */
1562 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1563 {
1564         dout("%s: obj %p\n", __func__, obj_request);
1565         obj_request_done_set(obj_request);
1566 }
1567
1568 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1569                                 struct ceph_msg *msg)
1570 {
1571         struct rbd_obj_request *obj_request = osd_req->r_priv;
1572         u16 opcode;
1573
1574         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1575         rbd_assert(osd_req == obj_request->osd_req);
1576         if (obj_request_img_data_test(obj_request)) {
1577                 rbd_assert(obj_request->img_request);
1578                 rbd_assert(obj_request->which != BAD_WHICH);
1579         } else {
1580                 rbd_assert(obj_request->which == BAD_WHICH);
1581         }
1582
1583         if (osd_req->r_result < 0)
1584                 obj_request->result = osd_req->r_result;
1585
1586         BUG_ON(osd_req->r_num_ops > 2);
1587
1588         /*
1589          * We support a 64-bit length, but ultimately it has to be
1590          * passed to blk_end_request(), which takes an unsigned int.
1591          */
1592         obj_request->xferred = osd_req->r_reply_op_len[0];
1593         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1594         opcode = osd_req->r_ops[0].op;
1595         switch (opcode) {
1596         case CEPH_OSD_OP_READ:
1597                 rbd_osd_read_callback(obj_request);
1598                 break;
1599         case CEPH_OSD_OP_WRITE:
1600                 rbd_osd_write_callback(obj_request);
1601                 break;
1602         case CEPH_OSD_OP_STAT:
1603                 rbd_osd_stat_callback(obj_request);
1604                 break;
1605         case CEPH_OSD_OP_CALL:
1606         case CEPH_OSD_OP_NOTIFY_ACK:
1607         case CEPH_OSD_OP_WATCH:
1608                 rbd_osd_trivial_callback(obj_request);
1609                 break;
1610         default:
1611                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1612                         obj_request->object_name, (unsigned short) opcode);
1613                 break;
1614         }
1615
1616         if (obj_request_done_test(obj_request))
1617                 rbd_obj_request_complete(obj_request);
1618 }
1619
1620 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1621 {
1622         struct rbd_img_request *img_request = obj_request->img_request;
1623         struct ceph_osd_request *osd_req = obj_request->osd_req;
1624         u64 snap_id;
1625
1626         rbd_assert(osd_req != NULL);
1627
1628         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1629         ceph_osdc_build_request(osd_req, obj_request->offset,
1630                         NULL, snap_id, NULL);
1631 }
1632
1633 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1634 {
1635         struct rbd_img_request *img_request = obj_request->img_request;
1636         struct ceph_osd_request *osd_req = obj_request->osd_req;
1637         struct ceph_snap_context *snapc;
1638         struct timespec mtime = CURRENT_TIME;
1639
1640         rbd_assert(osd_req != NULL);
1641
1642         snapc = img_request ? img_request->snapc : NULL;
1643         ceph_osdc_build_request(osd_req, obj_request->offset,
1644                         snapc, CEPH_NOSNAP, &mtime);
1645 }
1646
1647 static struct ceph_osd_request *rbd_osd_req_create(
1648                                         struct rbd_device *rbd_dev,
1649                                         bool write_request,
1650                                         struct rbd_obj_request *obj_request)
1651 {
1652         struct ceph_snap_context *snapc = NULL;
1653         struct ceph_osd_client *osdc;
1654         struct ceph_osd_request *osd_req;
1655
1656         if (obj_request_img_data_test(obj_request)) {
1657                 struct rbd_img_request *img_request = obj_request->img_request;
1658
1659                 rbd_assert(write_request ==
1660                                 img_request_write_test(img_request));
1661                 if (write_request)
1662                         snapc = img_request->snapc;
1663         }
1664
1665         /* Allocate and initialize the request, for the single op */
1666
1667         osdc = &rbd_dev->rbd_client->client->osdc;
1668         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1669         if (!osd_req)
1670                 return NULL;    /* ENOMEM */
1671
1672         if (write_request)
1673                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1674         else
1675                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1676
1677         osd_req->r_callback = rbd_osd_req_callback;
1678         osd_req->r_priv = obj_request;
1679
1680         osd_req->r_oid_len = strlen(obj_request->object_name);
1681         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1682         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1683
1684         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1685
1686         return osd_req;
1687 }
1688
1689 /*
1690  * Create a copyup osd request based on the information in the
1691  * object request supplied.  A copyup request has two osd ops,
1692  * a copyup method call, and a "normal" write request.
1693  */
1694 static struct ceph_osd_request *
1695 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1696 {
1697         struct rbd_img_request *img_request;
1698         struct ceph_snap_context *snapc;
1699         struct rbd_device *rbd_dev;
1700         struct ceph_osd_client *osdc;
1701         struct ceph_osd_request *osd_req;
1702
1703         rbd_assert(obj_request_img_data_test(obj_request));
1704         img_request = obj_request->img_request;
1705         rbd_assert(img_request);
1706         rbd_assert(img_request_write_test(img_request));
1707
1708         /* Allocate and initialize the request, for the two ops */
1709
1710         snapc = img_request->snapc;
1711         rbd_dev = img_request->rbd_dev;
1712         osdc = &rbd_dev->rbd_client->client->osdc;
1713         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1714         if (!osd_req)
1715                 return NULL;    /* ENOMEM */
1716
1717         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1718         osd_req->r_callback = rbd_osd_req_callback;
1719         osd_req->r_priv = obj_request;
1720
1721         osd_req->r_oid_len = strlen(obj_request->object_name);
1722         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1723         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1724
1725         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1726
1727         return osd_req;
1728 }
1729
1730
1731 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1732 {
1733         ceph_osdc_put_request(osd_req);
1734 }
1735
1736 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1737
1738 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1739                                                 u64 offset, u64 length,
1740                                                 enum obj_request_type type)
1741 {
1742         struct rbd_obj_request *obj_request;
1743         size_t size;
1744         char *name;
1745
1746         rbd_assert(obj_request_type_valid(type));
1747
1748         size = strlen(object_name) + 1;
1749         name = kmalloc(size, GFP_KERNEL);
1750         if (!name)
1751                 return NULL;
1752
1753         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1754         if (!obj_request) {
1755                 kfree(name);
1756                 return NULL;
1757         }
1758
1759         obj_request->object_name = memcpy(name, object_name, size);
1760         obj_request->offset = offset;
1761         obj_request->length = length;
1762         obj_request->flags = 0;
1763         obj_request->which = BAD_WHICH;
1764         obj_request->type = type;
1765         INIT_LIST_HEAD(&obj_request->links);
1766         init_completion(&obj_request->completion);
1767         kref_init(&obj_request->kref);
1768
1769         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1770                 offset, length, (int)type, obj_request);
1771
1772         return obj_request;
1773 }
1774
1775 static void rbd_obj_request_destroy(struct kref *kref)
1776 {
1777         struct rbd_obj_request *obj_request;
1778
1779         obj_request = container_of(kref, struct rbd_obj_request, kref);
1780
1781         dout("%s: obj %p\n", __func__, obj_request);
1782
1783         rbd_assert(obj_request->img_request == NULL);
1784         rbd_assert(obj_request->which == BAD_WHICH);
1785
1786         if (obj_request->osd_req)
1787                 rbd_osd_req_destroy(obj_request->osd_req);
1788
1789         rbd_assert(obj_request_type_valid(obj_request->type));
1790         switch (obj_request->type) {
1791         case OBJ_REQUEST_NODATA:
1792                 break;          /* Nothing to do */
1793         case OBJ_REQUEST_BIO:
1794                 if (obj_request->bio_list)
1795                         bio_chain_put(obj_request->bio_list);
1796                 break;
1797         case OBJ_REQUEST_PAGES:
1798                 if (obj_request->pages)
1799                         ceph_release_page_vector(obj_request->pages,
1800                                                 obj_request->page_count);
1801                 break;
1802         }
1803
1804         kfree(obj_request->object_name);
1805         obj_request->object_name = NULL;
1806         kmem_cache_free(rbd_obj_request_cache, obj_request);
1807 }
1808
1809 /*
1810  * Caller is responsible for filling in the list of object requests
1811  * that comprises the image request, and the Linux request pointer
1812  * (if there is one).
1813  */
1814 static struct rbd_img_request *rbd_img_request_create(
1815                                         struct rbd_device *rbd_dev,
1816                                         u64 offset, u64 length,
1817                                         bool write_request,
1818                                         bool child_request)
1819 {
1820         struct rbd_img_request *img_request;
1821
1822         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1823         if (!img_request)
1824                 return NULL;
1825
1826         if (write_request) {
1827                 down_read(&rbd_dev->header_rwsem);
1828                 ceph_get_snap_context(rbd_dev->header.snapc);
1829                 up_read(&rbd_dev->header_rwsem);
1830         }
1831
1832         img_request->rq = NULL;
1833         img_request->rbd_dev = rbd_dev;
1834         img_request->offset = offset;
1835         img_request->length = length;
1836         img_request->flags = 0;
1837         if (write_request) {
1838                 img_request_write_set(img_request);
1839                 img_request->snapc = rbd_dev->header.snapc;
1840         } else {
1841                 img_request->snap_id = rbd_dev->spec->snap_id;
1842         }
1843         if (child_request)
1844                 img_request_child_set(img_request);
1845         if (rbd_dev->parent_spec)
1846                 img_request_layered_set(img_request);
1847         spin_lock_init(&img_request->completion_lock);
1848         img_request->next_completion = 0;
1849         img_request->callback = NULL;
1850         img_request->result = 0;
1851         img_request->obj_request_count = 0;
1852         INIT_LIST_HEAD(&img_request->obj_requests);
1853         kref_init(&img_request->kref);
1854
1855         rbd_img_request_get(img_request);       /* Avoid a warning */
1856         rbd_img_request_put(img_request);       /* TEMPORARY */
1857
1858         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1859                 write_request ? "write" : "read", offset, length,
1860                 img_request);
1861
1862         return img_request;
1863 }
1864
1865 static void rbd_img_request_destroy(struct kref *kref)
1866 {
1867         struct rbd_img_request *img_request;
1868         struct rbd_obj_request *obj_request;
1869         struct rbd_obj_request *next_obj_request;
1870
1871         img_request = container_of(kref, struct rbd_img_request, kref);
1872
1873         dout("%s: img %p\n", __func__, img_request);
1874
1875         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1876                 rbd_img_obj_request_del(img_request, obj_request);
1877         rbd_assert(img_request->obj_request_count == 0);
1878
1879         if (img_request_write_test(img_request))
1880                 ceph_put_snap_context(img_request->snapc);
1881
1882         if (img_request_child_test(img_request))
1883                 rbd_obj_request_put(img_request->obj_request);
1884
1885         kmem_cache_free(rbd_img_request_cache, img_request);
1886 }
1887
1888 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1889 {
1890         struct rbd_img_request *img_request;
1891         unsigned int xferred;
1892         int result;
1893         bool more;
1894
1895         rbd_assert(obj_request_img_data_test(obj_request));
1896         img_request = obj_request->img_request;
1897
1898         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1899         xferred = (unsigned int)obj_request->xferred;
1900         result = obj_request->result;
1901         if (result) {
1902                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1903
1904                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1905                         img_request_write_test(img_request) ? "write" : "read",
1906                         obj_request->length, obj_request->img_offset,
1907                         obj_request->offset);
1908                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1909                         result, xferred);
1910                 if (!img_request->result)
1911                         img_request->result = result;
1912         }
1913
1914         /* Image object requests don't own their page array */
1915
1916         if (obj_request->type == OBJ_REQUEST_PAGES) {
1917                 obj_request->pages = NULL;
1918                 obj_request->page_count = 0;
1919         }
1920
1921         if (img_request_child_test(img_request)) {
1922                 rbd_assert(img_request->obj_request != NULL);
1923                 more = obj_request->which < img_request->obj_request_count - 1;
1924         } else {
1925                 rbd_assert(img_request->rq != NULL);
1926                 more = blk_end_request(img_request->rq, result, xferred);
1927         }
1928
1929         return more;
1930 }
1931
1932 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1933 {
1934         struct rbd_img_request *img_request;
1935         u32 which = obj_request->which;
1936         bool more = true;
1937
1938         rbd_assert(obj_request_img_data_test(obj_request));
1939         img_request = obj_request->img_request;
1940
1941         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1942         rbd_assert(img_request != NULL);
1943         rbd_assert(img_request->obj_request_count > 0);
1944         rbd_assert(which != BAD_WHICH);
1945         rbd_assert(which < img_request->obj_request_count);
1946         rbd_assert(which >= img_request->next_completion);
1947
1948         spin_lock_irq(&img_request->completion_lock);
1949         if (which != img_request->next_completion)
1950                 goto out;
1951
1952         for_each_obj_request_from(img_request, obj_request) {
1953                 rbd_assert(more);
1954                 rbd_assert(which < img_request->obj_request_count);
1955
1956                 if (!obj_request_done_test(obj_request))
1957                         break;
1958                 more = rbd_img_obj_end_request(obj_request);
1959                 which++;
1960         }
1961
1962         rbd_assert(more ^ (which == img_request->obj_request_count));
1963         img_request->next_completion = which;
1964 out:
1965         spin_unlock_irq(&img_request->completion_lock);
1966
1967         if (!more)
1968                 rbd_img_request_complete(img_request);
1969 }
1970
1971 /*
1972  * Split up an image request into one or more object requests, each
1973  * to a different object.  The "type" parameter indicates whether
1974  * "data_desc" is the pointer to the head of a list of bio
1975  * structures, or the base of a page array.  In either case this
1976  * function assumes data_desc describes memory sufficient to hold
1977  * all data described by the image request.
1978  */
1979 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1980                                         enum obj_request_type type,
1981                                         void *data_desc)
1982 {
1983         struct rbd_device *rbd_dev = img_request->rbd_dev;
1984         struct rbd_obj_request *obj_request = NULL;
1985         struct rbd_obj_request *next_obj_request;
1986         bool write_request = img_request_write_test(img_request);
1987         struct bio *bio_list;
1988         unsigned int bio_offset = 0;
1989         struct page **pages;
1990         u64 img_offset;
1991         u64 resid;
1992         u16 opcode;
1993
1994         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1995                 (int)type, data_desc);
1996
1997         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1998         img_offset = img_request->offset;
1999         resid = img_request->length;
2000         rbd_assert(resid > 0);
2001
2002         if (type == OBJ_REQUEST_BIO) {
2003                 bio_list = data_desc;
2004                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2005         } else {
2006                 rbd_assert(type == OBJ_REQUEST_PAGES);
2007                 pages = data_desc;
2008         }
2009
2010         while (resid) {
2011                 struct ceph_osd_request *osd_req;
2012                 const char *object_name;
2013                 u64 offset;
2014                 u64 length;
2015
2016                 object_name = rbd_segment_name(rbd_dev, img_offset);
2017                 if (!object_name)
2018                         goto out_unwind;
2019                 offset = rbd_segment_offset(rbd_dev, img_offset);
2020                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2021                 obj_request = rbd_obj_request_create(object_name,
2022                                                 offset, length, type);
2023                 /* object request has its own copy of the object name */
2024                 rbd_segment_name_free(object_name);
2025                 if (!obj_request)
2026                         goto out_unwind;
2027
2028                 if (type == OBJ_REQUEST_BIO) {
2029                         unsigned int clone_size;
2030
2031                         rbd_assert(length <= (u64)UINT_MAX);
2032                         clone_size = (unsigned int)length;
2033                         obj_request->bio_list =
2034                                         bio_chain_clone_range(&bio_list,
2035                                                                 &bio_offset,
2036                                                                 clone_size,
2037                                                                 GFP_ATOMIC);
2038                         if (!obj_request->bio_list)
2039                                 goto out_partial;
2040                 } else {
2041                         unsigned int page_count;
2042
2043                         obj_request->pages = pages;
2044                         page_count = (u32)calc_pages_for(offset, length);
2045                         obj_request->page_count = page_count;
2046                         if ((offset + length) & ~PAGE_MASK)
2047                                 page_count--;   /* more on last page */
2048                         pages += page_count;
2049                 }
2050
2051                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2052                                                 obj_request);
2053                 if (!osd_req)
2054                         goto out_partial;
2055                 obj_request->osd_req = osd_req;
2056                 obj_request->callback = rbd_img_obj_callback;
2057
2058                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2059                                                 0, 0);
2060                 if (type == OBJ_REQUEST_BIO)
2061                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2062                                         obj_request->bio_list, length);
2063                 else
2064                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2065                                         obj_request->pages, length,
2066                                         offset & ~PAGE_MASK, false, false);
2067
2068                 if (write_request)
2069                         rbd_osd_req_format_write(obj_request);
2070                 else
2071                         rbd_osd_req_format_read(obj_request);
2072
2073                 obj_request->img_offset = img_offset;
2074                 rbd_img_obj_request_add(img_request, obj_request);
2075
2076                 img_offset += length;
2077                 resid -= length;
2078         }
2079
2080         return 0;
2081
2082 out_partial:
2083         rbd_obj_request_put(obj_request);
2084 out_unwind:
2085         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2086                 rbd_obj_request_put(obj_request);
2087
2088         return -ENOMEM;
2089 }
2090
2091 static void
2092 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2093 {
2094         struct rbd_img_request *img_request;
2095         struct rbd_device *rbd_dev;
2096         u64 length;
2097         u32 page_count;
2098
2099         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2100         rbd_assert(obj_request_img_data_test(obj_request));
2101         img_request = obj_request->img_request;
2102         rbd_assert(img_request);
2103
2104         rbd_dev = img_request->rbd_dev;
2105         rbd_assert(rbd_dev);
2106         length = (u64)1 << rbd_dev->header.obj_order;
2107         page_count = (u32)calc_pages_for(0, length);
2108
2109         rbd_assert(obj_request->copyup_pages);
2110         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2111         obj_request->copyup_pages = NULL;
2112
2113         /*
2114          * We want the transfer count to reflect the size of the
2115          * original write request.  There is no such thing as a
2116          * successful short write, so if the request was successful
2117          * we can just set it to the originally-requested length.
2118          */
2119         if (!obj_request->result)
2120                 obj_request->xferred = obj_request->length;
2121
2122         /* Finish up with the normal image object callback */
2123
2124         rbd_img_obj_callback(obj_request);
2125 }
2126
2127 static void
2128 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2129 {
2130         struct rbd_obj_request *orig_request;
2131         struct ceph_osd_request *osd_req;
2132         struct ceph_osd_client *osdc;
2133         struct rbd_device *rbd_dev;
2134         struct page **pages;
2135         int result;
2136         u64 obj_size;
2137         u64 xferred;
2138
2139         rbd_assert(img_request_child_test(img_request));
2140
2141         /* First get what we need from the image request */
2142
2143         pages = img_request->copyup_pages;
2144         rbd_assert(pages != NULL);
2145         img_request->copyup_pages = NULL;
2146
2147         orig_request = img_request->obj_request;
2148         rbd_assert(orig_request != NULL);
2149         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2150         result = img_request->result;
2151         obj_size = img_request->length;
2152         xferred = img_request->xferred;
2153
2154         rbd_dev = img_request->rbd_dev;
2155         rbd_assert(rbd_dev);
2156         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2157
2158         rbd_img_request_put(img_request);
2159
2160         if (result)
2161                 goto out_err;
2162
2163         /* Allocate the new copyup osd request for the original request */
2164
2165         result = -ENOMEM;
2166         rbd_assert(!orig_request->osd_req);
2167         osd_req = rbd_osd_req_create_copyup(orig_request);
2168         if (!osd_req)
2169                 goto out_err;
2170         orig_request->osd_req = osd_req;
2171         orig_request->copyup_pages = pages;
2172
2173         /* Initialize the copyup op */
2174
2175         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2176         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2177                                                 false, false);
2178
2179         /* Then the original write request op */
2180
2181         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2182                                         orig_request->offset,
2183                                         orig_request->length, 0, 0);
2184         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2185                                         orig_request->length);
2186
2187         rbd_osd_req_format_write(orig_request);
2188
2189         /* All set, send it off. */
2190
2191         orig_request->callback = rbd_img_obj_copyup_callback;
2192         osdc = &rbd_dev->rbd_client->client->osdc;
2193         result = rbd_obj_request_submit(osdc, orig_request);
2194         if (!result)
2195                 return;
2196 out_err:
2197         /* Record the error code and complete the request */
2198
2199         orig_request->result = result;
2200         orig_request->xferred = 0;
2201         obj_request_done_set(orig_request);
2202         rbd_obj_request_complete(orig_request);
2203 }
2204
2205 /*
2206  * Read from the parent image the range of data that covers the
2207  * entire target of the given object request.  This is used for
2208  * satisfying a layered image write request when the target of an
2209  * object request from the image request does not exist.
2210  *
2211  * A page array big enough to hold the returned data is allocated
2212  * and supplied to rbd_img_request_fill() as the "data descriptor."
2213  * When the read completes, this page array will be transferred to
2214  * the original object request for the copyup operation.
2215  *
2216  * If an error occurs, record it as the result of the original
2217  * object request and mark it done so it gets completed.
2218  */
2219 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2220 {
2221         struct rbd_img_request *img_request = NULL;
2222         struct rbd_img_request *parent_request = NULL;
2223         struct rbd_device *rbd_dev;
2224         u64 img_offset;
2225         u64 length;
2226         struct page **pages = NULL;
2227         u32 page_count;
2228         int result;
2229
2230         rbd_assert(obj_request_img_data_test(obj_request));
2231         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2232
2233         img_request = obj_request->img_request;
2234         rbd_assert(img_request != NULL);
2235         rbd_dev = img_request->rbd_dev;
2236         rbd_assert(rbd_dev->parent != NULL);
2237
2238         /*
2239          * First things first.  The original osd request is of no
2240          * use to use any more, we'll need a new one that can hold
2241          * the two ops in a copyup request.  We'll get that later,
2242          * but for now we can release the old one.
2243          */
2244         rbd_osd_req_destroy(obj_request->osd_req);
2245         obj_request->osd_req = NULL;
2246
2247         /*
2248          * Determine the byte range covered by the object in the
2249          * child image to which the original request was to be sent.
2250          */
2251         img_offset = obj_request->img_offset - obj_request->offset;
2252         length = (u64)1 << rbd_dev->header.obj_order;
2253
2254         /*
2255          * There is no defined parent data beyond the parent
2256          * overlap, so limit what we read at that boundary if
2257          * necessary.
2258          */
2259         if (img_offset + length > rbd_dev->parent_overlap) {
2260                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2261                 length = rbd_dev->parent_overlap - img_offset;
2262         }
2263
2264         /*
2265          * Allocate a page array big enough to receive the data read
2266          * from the parent.
2267          */
2268         page_count = (u32)calc_pages_for(0, length);
2269         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2270         if (IS_ERR(pages)) {
2271                 result = PTR_ERR(pages);
2272                 pages = NULL;
2273                 goto out_err;
2274         }
2275
2276         result = -ENOMEM;
2277         parent_request = rbd_img_request_create(rbd_dev->parent,
2278                                                 img_offset, length,
2279                                                 false, true);
2280         if (!parent_request)
2281                 goto out_err;
2282         rbd_obj_request_get(obj_request);
2283         parent_request->obj_request = obj_request;
2284
2285         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2286         if (result)
2287                 goto out_err;
2288         parent_request->copyup_pages = pages;
2289
2290         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2291         result = rbd_img_request_submit(parent_request);
2292         if (!result)
2293                 return 0;
2294
2295         parent_request->copyup_pages = NULL;
2296         parent_request->obj_request = NULL;
2297         rbd_obj_request_put(obj_request);
2298 out_err:
2299         if (pages)
2300                 ceph_release_page_vector(pages, page_count);
2301         if (parent_request)
2302                 rbd_img_request_put(parent_request);
2303         obj_request->result = result;
2304         obj_request->xferred = 0;
2305         obj_request_done_set(obj_request);
2306
2307         return result;
2308 }
2309
2310 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2311 {
2312         struct rbd_obj_request *orig_request;
2313         int result;
2314
2315         rbd_assert(!obj_request_img_data_test(obj_request));
2316
2317         /*
2318          * All we need from the object request is the original
2319          * request and the result of the STAT op.  Grab those, then
2320          * we're done with the request.
2321          */
2322         orig_request = obj_request->obj_request;
2323         obj_request->obj_request = NULL;
2324         rbd_assert(orig_request);
2325         rbd_assert(orig_request->img_request);
2326
2327         result = obj_request->result;
2328         obj_request->result = 0;
2329
2330         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2331                 obj_request, orig_request, result,
2332                 obj_request->xferred, obj_request->length);
2333         rbd_obj_request_put(obj_request);
2334
2335         rbd_assert(orig_request);
2336         rbd_assert(orig_request->img_request);
2337
2338         /*
2339          * Our only purpose here is to determine whether the object
2340          * exists, and we don't want to treat the non-existence as
2341          * an error.  If something else comes back, transfer the
2342          * error to the original request and complete it now.
2343          */
2344         if (!result) {
2345                 obj_request_existence_set(orig_request, true);
2346         } else if (result == -ENOENT) {
2347                 obj_request_existence_set(orig_request, false);
2348         } else if (result) {
2349                 orig_request->result = result;
2350                 goto out;
2351         }
2352
2353         /*
2354          * Resubmit the original request now that we have recorded
2355          * whether the target object exists.
2356          */
2357         orig_request->result = rbd_img_obj_request_submit(orig_request);
2358 out:
2359         if (orig_request->result)
2360                 rbd_obj_request_complete(orig_request);
2361         rbd_obj_request_put(orig_request);
2362 }
2363
2364 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2365 {
2366         struct rbd_obj_request *stat_request;
2367         struct rbd_device *rbd_dev;
2368         struct ceph_osd_client *osdc;
2369         struct page **pages = NULL;
2370         u32 page_count;
2371         size_t size;
2372         int ret;
2373
2374         /*
2375          * The response data for a STAT call consists of:
2376          *     le64 length;
2377          *     struct {
2378          *         le32 tv_sec;
2379          *         le32 tv_nsec;
2380          *     } mtime;
2381          */
2382         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2383         page_count = (u32)calc_pages_for(0, size);
2384         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2385         if (IS_ERR(pages))
2386                 return PTR_ERR(pages);
2387
2388         ret = -ENOMEM;
2389         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2390                                                         OBJ_REQUEST_PAGES);
2391         if (!stat_request)
2392                 goto out;
2393
2394         rbd_obj_request_get(obj_request);
2395         stat_request->obj_request = obj_request;
2396         stat_request->pages = pages;
2397         stat_request->page_count = page_count;
2398
2399         rbd_assert(obj_request->img_request);
2400         rbd_dev = obj_request->img_request->rbd_dev;
2401         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2402                                                 stat_request);
2403         if (!stat_request->osd_req)
2404                 goto out;
2405         stat_request->callback = rbd_img_obj_exists_callback;
2406
2407         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2408         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2409                                         false, false);
2410         rbd_osd_req_format_read(stat_request);
2411
2412         osdc = &rbd_dev->rbd_client->client->osdc;
2413         ret = rbd_obj_request_submit(osdc, stat_request);
2414 out:
2415         if (ret)
2416                 rbd_obj_request_put(obj_request);
2417
2418         return ret;
2419 }
2420
2421 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2422 {
2423         struct rbd_img_request *img_request;
2424         struct rbd_device *rbd_dev;
2425         bool known;
2426
2427         rbd_assert(obj_request_img_data_test(obj_request));
2428
2429         img_request = obj_request->img_request;
2430         rbd_assert(img_request);
2431         rbd_dev = img_request->rbd_dev;
2432
2433         /*
2434          * Only writes to layered images need special handling.
2435          * Reads and non-layered writes are simple object requests.
2436          * Layered writes that start beyond the end of the overlap
2437          * with the parent have no parent data, so they too are
2438          * simple object requests.  Finally, if the target object is
2439          * known to already exist, its parent data has already been
2440          * copied, so a write to the object can also be handled as a
2441          * simple object request.
2442          */
2443         if (!img_request_write_test(img_request) ||
2444                 !img_request_layered_test(img_request) ||
2445                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2446                 ((known = obj_request_known_test(obj_request)) &&
2447                         obj_request_exists_test(obj_request))) {
2448
2449                 struct rbd_device *rbd_dev;
2450                 struct ceph_osd_client *osdc;
2451
2452                 rbd_dev = obj_request->img_request->rbd_dev;
2453                 osdc = &rbd_dev->rbd_client->client->osdc;
2454
2455                 return rbd_obj_request_submit(osdc, obj_request);
2456         }
2457
2458         /*
2459          * It's a layered write.  The target object might exist but
2460          * we may not know that yet.  If we know it doesn't exist,
2461          * start by reading the data for the full target object from
2462          * the parent so we can use it for a copyup to the target.
2463          */
2464         if (known)
2465                 return rbd_img_obj_parent_read_full(obj_request);
2466
2467         /* We don't know whether the target exists.  Go find out. */
2468
2469         return rbd_img_obj_exists_submit(obj_request);
2470 }
2471
2472 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2473 {
2474         struct rbd_obj_request *obj_request;
2475         struct rbd_obj_request *next_obj_request;
2476
2477         dout("%s: img %p\n", __func__, img_request);
2478         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2479                 int ret;
2480
2481                 ret = rbd_img_obj_request_submit(obj_request);
2482                 if (ret)
2483                         return ret;
2484         }
2485
2486         return 0;
2487 }
2488
2489 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2490 {
2491         struct rbd_obj_request *obj_request;
2492         struct rbd_device *rbd_dev;
2493         u64 obj_end;
2494
2495         rbd_assert(img_request_child_test(img_request));
2496
2497         obj_request = img_request->obj_request;
2498         rbd_assert(obj_request);
2499         rbd_assert(obj_request->img_request);
2500
2501         obj_request->result = img_request->result;
2502         if (obj_request->result)
2503                 goto out;
2504
2505         /*
2506          * We need to zero anything beyond the parent overlap
2507          * boundary.  Since rbd_img_obj_request_read_callback()
2508          * will zero anything beyond the end of a short read, an
2509          * easy way to do this is to pretend the data from the
2510          * parent came up short--ending at the overlap boundary.
2511          */
2512         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2513         obj_end = obj_request->img_offset + obj_request->length;
2514         rbd_dev = obj_request->img_request->rbd_dev;
2515         if (obj_end > rbd_dev->parent_overlap) {
2516                 u64 xferred = 0;
2517
2518                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2519                         xferred = rbd_dev->parent_overlap -
2520                                         obj_request->img_offset;
2521
2522                 obj_request->xferred = min(img_request->xferred, xferred);
2523         } else {
2524                 obj_request->xferred = img_request->xferred;
2525         }
2526 out:
2527         rbd_img_request_put(img_request);
2528         rbd_img_obj_request_read_callback(obj_request);
2529         rbd_obj_request_complete(obj_request);
2530 }
2531
2532 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2533 {
2534         struct rbd_device *rbd_dev;
2535         struct rbd_img_request *img_request;
2536         int result;
2537
2538         rbd_assert(obj_request_img_data_test(obj_request));
2539         rbd_assert(obj_request->img_request != NULL);
2540         rbd_assert(obj_request->result == (s32) -ENOENT);
2541         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2542
2543         rbd_dev = obj_request->img_request->rbd_dev;
2544         rbd_assert(rbd_dev->parent != NULL);
2545         /* rbd_read_finish(obj_request, obj_request->length); */
2546         img_request = rbd_img_request_create(rbd_dev->parent,
2547                                                 obj_request->img_offset,
2548                                                 obj_request->length,
2549                                                 false, true);
2550         result = -ENOMEM;
2551         if (!img_request)
2552                 goto out_err;
2553
2554         rbd_obj_request_get(obj_request);
2555         img_request->obj_request = obj_request;
2556
2557         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2558                                         obj_request->bio_list);
2559         if (result)
2560                 goto out_err;
2561
2562         img_request->callback = rbd_img_parent_read_callback;
2563         result = rbd_img_request_submit(img_request);
2564         if (result)
2565                 goto out_err;
2566
2567         return;
2568 out_err:
2569         if (img_request)
2570                 rbd_img_request_put(img_request);
2571         obj_request->result = result;
2572         obj_request->xferred = 0;
2573         obj_request_done_set(obj_request);
2574 }
2575
2576 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2577 {
2578         struct rbd_obj_request *obj_request;
2579         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2580         int ret;
2581
2582         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2583                                                         OBJ_REQUEST_NODATA);
2584         if (!obj_request)
2585                 return -ENOMEM;
2586
2587         ret = -ENOMEM;
2588         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2589         if (!obj_request->osd_req)
2590                 goto out;
2591         obj_request->callback = rbd_obj_request_put;
2592
2593         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2594                                         notify_id, 0, 0);
2595         rbd_osd_req_format_read(obj_request);
2596
2597         ret = rbd_obj_request_submit(osdc, obj_request);
2598 out:
2599         if (ret)
2600                 rbd_obj_request_put(obj_request);
2601
2602         return ret;
2603 }
2604
2605 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2606 {
2607         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2608         int ret;
2609
2610         if (!rbd_dev)
2611                 return;
2612
2613         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2614                 rbd_dev->header_name, (unsigned long long)notify_id,
2615                 (unsigned int)opcode);
2616         ret = rbd_dev_refresh(rbd_dev);
2617         if (ret)
2618                 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2619
2620         rbd_obj_notify_ack(rbd_dev, notify_id);
2621 }
2622
2623 /*
2624  * Request sync osd watch/unwatch.  The value of "start" determines
2625  * whether a watch request is being initiated or torn down.
2626  */
2627 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2628 {
2629         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2630         struct rbd_obj_request *obj_request;
2631         int ret;
2632
2633         rbd_assert(start ^ !!rbd_dev->watch_event);
2634         rbd_assert(start ^ !!rbd_dev->watch_request);
2635
2636         if (start) {
2637                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2638                                                 &rbd_dev->watch_event);
2639                 if (ret < 0)
2640                         return ret;
2641                 rbd_assert(rbd_dev->watch_event != NULL);
2642         }
2643
2644         ret = -ENOMEM;
2645         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2646                                                         OBJ_REQUEST_NODATA);
2647         if (!obj_request)
2648                 goto out_cancel;
2649
2650         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2651         if (!obj_request->osd_req)
2652                 goto out_cancel;
2653
2654         if (start)
2655                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2656         else
2657                 ceph_osdc_unregister_linger_request(osdc,
2658                                         rbd_dev->watch_request->osd_req);
2659
2660         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2661                                 rbd_dev->watch_event->cookie, 0, start);
2662         rbd_osd_req_format_write(obj_request);
2663
2664         ret = rbd_obj_request_submit(osdc, obj_request);
2665         if (ret)
2666                 goto out_cancel;
2667         ret = rbd_obj_request_wait(obj_request);
2668         if (ret)
2669                 goto out_cancel;
2670         ret = obj_request->result;
2671         if (ret)
2672                 goto out_cancel;
2673
2674         /*
2675          * A watch request is set to linger, so the underlying osd
2676          * request won't go away until we unregister it.  We retain
2677          * a pointer to the object request during that time (in
2678          * rbd_dev->watch_request), so we'll keep a reference to
2679          * it.  We'll drop that reference (below) after we've
2680          * unregistered it.
2681          */
2682         if (start) {
2683                 rbd_dev->watch_request = obj_request;
2684
2685                 return 0;
2686         }
2687
2688         /* We have successfully torn down the watch request */
2689
2690         rbd_obj_request_put(rbd_dev->watch_request);
2691         rbd_dev->watch_request = NULL;
2692 out_cancel:
2693         /* Cancel the event if we're tearing down, or on error */
2694         ceph_osdc_cancel_event(rbd_dev->watch_event);
2695         rbd_dev->watch_event = NULL;
2696         if (obj_request)
2697                 rbd_obj_request_put(obj_request);
2698
2699         return ret;
2700 }
2701
2702 /*
2703  * Synchronous osd object method call.  Returns the number of bytes
2704  * returned in the outbound buffer, or a negative error code.
2705  */
2706 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2707                              const char *object_name,
2708                              const char *class_name,
2709                              const char *method_name,
2710                              const void *outbound,
2711                              size_t outbound_size,
2712                              void *inbound,
2713                              size_t inbound_size)
2714 {
2715         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2716         struct rbd_obj_request *obj_request;
2717         struct page **pages;
2718         u32 page_count;
2719         int ret;
2720
2721         /*
2722          * Method calls are ultimately read operations.  The result
2723          * should placed into the inbound buffer provided.  They
2724          * also supply outbound data--parameters for the object
2725          * method.  Currently if this is present it will be a
2726          * snapshot id.
2727          */
2728         page_count = (u32)calc_pages_for(0, inbound_size);
2729         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2730         if (IS_ERR(pages))
2731                 return PTR_ERR(pages);
2732
2733         ret = -ENOMEM;
2734         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2735                                                         OBJ_REQUEST_PAGES);
2736         if (!obj_request)
2737                 goto out;
2738
2739         obj_request->pages = pages;
2740         obj_request->page_count = page_count;
2741
2742         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2743         if (!obj_request->osd_req)
2744                 goto out;
2745
2746         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2747                                         class_name, method_name);
2748         if (outbound_size) {
2749                 struct ceph_pagelist *pagelist;
2750
2751                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2752                 if (!pagelist)
2753                         goto out;
2754
2755                 ceph_pagelist_init(pagelist);
2756                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2757                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2758                                                 pagelist);
2759         }
2760         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2761                                         obj_request->pages, inbound_size,
2762                                         0, false, false);
2763         rbd_osd_req_format_read(obj_request);
2764
2765         ret = rbd_obj_request_submit(osdc, obj_request);
2766         if (ret)
2767                 goto out;
2768         ret = rbd_obj_request_wait(obj_request);
2769         if (ret)
2770                 goto out;
2771
2772         ret = obj_request->result;
2773         if (ret < 0)
2774                 goto out;
2775
2776         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2777         ret = (int)obj_request->xferred;
2778         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2779 out:
2780         if (obj_request)
2781                 rbd_obj_request_put(obj_request);
2782         else
2783                 ceph_release_page_vector(pages, page_count);
2784
2785         return ret;
2786 }
2787
2788 static void rbd_request_fn(struct request_queue *q)
2789                 __releases(q->queue_lock) __acquires(q->queue_lock)
2790 {
2791         struct rbd_device *rbd_dev = q->queuedata;
2792         bool read_only = rbd_dev->mapping.read_only;
2793         struct request *rq;
2794         int result;
2795
2796         while ((rq = blk_fetch_request(q))) {
2797                 bool write_request = rq_data_dir(rq) == WRITE;
2798                 struct rbd_img_request *img_request;
2799                 u64 offset;
2800                 u64 length;
2801
2802                 /* Ignore any non-FS requests that filter through. */
2803
2804                 if (rq->cmd_type != REQ_TYPE_FS) {
2805                         dout("%s: non-fs request type %d\n", __func__,
2806                                 (int) rq->cmd_type);
2807                         __blk_end_request_all(rq, 0);
2808                         continue;
2809                 }
2810
2811                 /* Ignore/skip any zero-length requests */
2812
2813                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2814                 length = (u64) blk_rq_bytes(rq);
2815
2816                 if (!length) {
2817                         dout("%s: zero-length request\n", __func__);
2818                         __blk_end_request_all(rq, 0);
2819                         continue;
2820                 }
2821
2822                 spin_unlock_irq(q->queue_lock);
2823
2824                 /* Disallow writes to a read-only device */
2825
2826                 if (write_request) {
2827                         result = -EROFS;
2828                         if (read_only)
2829                                 goto end_request;
2830                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2831                 }
2832
2833                 /*
2834                  * Quit early if the mapped snapshot no longer
2835                  * exists.  It's still possible the snapshot will
2836                  * have disappeared by the time our request arrives
2837                  * at the osd, but there's no sense in sending it if
2838                  * we already know.
2839                  */
2840                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2841                         dout("request for non-existent snapshot");
2842                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2843                         result = -ENXIO;
2844                         goto end_request;
2845                 }
2846
2847                 result = -EINVAL;
2848                 if (offset && length > U64_MAX - offset + 1) {
2849                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2850                                 offset, length);
2851                         goto end_request;       /* Shouldn't happen */
2852                 }
2853
2854                 result = -EIO;
2855                 if (offset + length > rbd_dev->mapping.size) {
2856                         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2857                                 offset, length, rbd_dev->mapping.size);
2858                         goto end_request;
2859                 }
2860
2861                 result = -ENOMEM;
2862                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2863                                                         write_request, false);
2864                 if (!img_request)
2865                         goto end_request;
2866
2867                 img_request->rq = rq;
2868
2869                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2870                                                 rq->bio);
2871                 if (!result)
2872                         result = rbd_img_request_submit(img_request);
2873                 if (result)
2874                         rbd_img_request_put(img_request);
2875 end_request:
2876                 spin_lock_irq(q->queue_lock);
2877                 if (result < 0) {
2878                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2879                                 write_request ? "write" : "read",
2880                                 length, offset, result);
2881
2882                         __blk_end_request_all(rq, result);
2883                 }
2884         }
2885 }
2886
2887 /*
2888  * a queue callback. Makes sure that we don't create a bio that spans across
2889  * multiple osd objects. One exception would be with a single page bios,
2890  * which we handle later at bio_chain_clone_range()
2891  */
2892 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2893                           struct bio_vec *bvec)
2894 {
2895         struct rbd_device *rbd_dev = q->queuedata;
2896         sector_t sector_offset;
2897         sector_t sectors_per_obj;
2898         sector_t obj_sector_offset;
2899         int ret;
2900
2901         /*
2902          * Find how far into its rbd object the partition-relative
2903          * bio start sector is to offset relative to the enclosing
2904          * device.
2905          */
2906         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2907         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2908         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2909
2910         /*
2911          * Compute the number of bytes from that offset to the end
2912          * of the object.  Account for what's already used by the bio.
2913          */
2914         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2915         if (ret > bmd->bi_size)
2916                 ret -= bmd->bi_size;
2917         else
2918                 ret = 0;
2919
2920         /*
2921          * Don't send back more than was asked for.  And if the bio
2922          * was empty, let the whole thing through because:  "Note
2923          * that a block device *must* allow a single page to be
2924          * added to an empty bio."
2925          */
2926         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2927         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2928                 ret = (int) bvec->bv_len;
2929
2930         return ret;
2931 }
2932
2933 static void rbd_free_disk(struct rbd_device *rbd_dev)
2934 {
2935         struct gendisk *disk = rbd_dev->disk;
2936
2937         if (!disk)
2938                 return;
2939
2940         rbd_dev->disk = NULL;
2941         if (disk->flags & GENHD_FL_UP) {
2942                 del_gendisk(disk);
2943                 if (disk->queue)
2944                         blk_cleanup_queue(disk->queue);
2945         }
2946         put_disk(disk);
2947 }
2948
2949 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2950                                 const char *object_name,
2951                                 u64 offset, u64 length, void *buf)
2952
2953 {
2954         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2955         struct rbd_obj_request *obj_request;
2956         struct page **pages = NULL;
2957         u32 page_count;
2958         size_t size;
2959         int ret;
2960
2961         page_count = (u32) calc_pages_for(offset, length);
2962         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2963         if (IS_ERR(pages))
2964                 ret = PTR_ERR(pages);
2965
2966         ret = -ENOMEM;
2967         obj_request = rbd_obj_request_create(object_name, offset, length,
2968                                                         OBJ_REQUEST_PAGES);
2969         if (!obj_request)
2970                 goto out;
2971
2972         obj_request->pages = pages;
2973         obj_request->page_count = page_count;
2974
2975         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2976         if (!obj_request->osd_req)
2977                 goto out;
2978
2979         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2980                                         offset, length, 0, 0);
2981         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2982                                         obj_request->pages,
2983                                         obj_request->length,
2984                                         obj_request->offset & ~PAGE_MASK,
2985                                         false, false);
2986         rbd_osd_req_format_read(obj_request);
2987
2988         ret = rbd_obj_request_submit(osdc, obj_request);
2989         if (ret)
2990                 goto out;
2991         ret = rbd_obj_request_wait(obj_request);
2992         if (ret)
2993                 goto out;
2994
2995         ret = obj_request->result;
2996         if (ret < 0)
2997                 goto out;
2998
2999         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3000         size = (size_t) obj_request->xferred;
3001         ceph_copy_from_page_vector(pages, buf, 0, size);
3002         rbd_assert(size <= (size_t)INT_MAX);
3003         ret = (int)size;
3004 out:
3005         if (obj_request)
3006                 rbd_obj_request_put(obj_request);
3007         else
3008                 ceph_release_page_vector(pages, page_count);
3009
3010         return ret;
3011 }
3012
3013 /*
3014  * Read the complete header for the given rbd device.
3015  *
3016  * Returns a pointer to a dynamically-allocated buffer containing
3017  * the complete and validated header.  Caller can pass the address
3018  * of a variable that will be filled in with the version of the
3019  * header object at the time it was read.
3020  *
3021  * Returns a pointer-coded errno if a failure occurs.
3022  */
3023 static struct rbd_image_header_ondisk *
3024 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3025 {
3026         struct rbd_image_header_ondisk *ondisk = NULL;
3027         u32 snap_count = 0;
3028         u64 names_size = 0;
3029         u32 want_count;
3030         int ret;
3031
3032         /*
3033          * The complete header will include an array of its 64-bit
3034          * snapshot ids, followed by the names of those snapshots as
3035          * a contiguous block of NUL-terminated strings.  Note that
3036          * the number of snapshots could change by the time we read
3037          * it in, in which case we re-read it.
3038          */
3039         do {
3040                 size_t size;
3041
3042                 kfree(ondisk);
3043
3044                 size = sizeof (*ondisk);
3045                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3046                 size += names_size;
3047                 ondisk = kmalloc(size, GFP_KERNEL);
3048                 if (!ondisk)
3049                         return ERR_PTR(-ENOMEM);
3050
3051                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3052                                        0, size, ondisk);
3053                 if (ret < 0)
3054                         goto out_err;
3055                 if ((size_t)ret < size) {
3056                         ret = -ENXIO;
3057                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3058                                 size, ret);
3059                         goto out_err;
3060                 }
3061                 if (!rbd_dev_ondisk_valid(ondisk)) {
3062                         ret = -ENXIO;
3063                         rbd_warn(rbd_dev, "invalid header");
3064                         goto out_err;
3065                 }
3066
3067                 names_size = le64_to_cpu(ondisk->snap_names_len);
3068                 want_count = snap_count;
3069                 snap_count = le32_to_cpu(ondisk->snap_count);
3070         } while (snap_count != want_count);
3071
3072         return ondisk;
3073
3074 out_err:
3075         kfree(ondisk);
3076
3077         return ERR_PTR(ret);
3078 }
3079
3080 /*
3081  * reload the ondisk the header
3082  */
3083 static int rbd_read_header(struct rbd_device *rbd_dev,
3084                            struct rbd_image_header *header)
3085 {
3086         struct rbd_image_header_ondisk *ondisk;
3087         int ret;
3088
3089         ondisk = rbd_dev_v1_header_read(rbd_dev);
3090         if (IS_ERR(ondisk))
3091                 return PTR_ERR(ondisk);
3092         ret = rbd_header_from_disk(header, ondisk);
3093         kfree(ondisk);
3094
3095         return ret;
3096 }
3097
3098 /*
3099  * only read the first part of the ondisk header, without the snaps info
3100  */
3101 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3102 {
3103         int ret;
3104         struct rbd_image_header h;
3105
3106         ret = rbd_read_header(rbd_dev, &h);
3107         if (ret < 0)
3108                 return ret;
3109
3110         down_write(&rbd_dev->header_rwsem);
3111
3112         /* Update image size, and check for resize of mapped image */
3113         rbd_dev->header.image_size = h.image_size;
3114         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
3115                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3116                         rbd_dev->mapping.size = rbd_dev->header.image_size;
3117
3118         /* rbd_dev->header.object_prefix shouldn't change */
3119         kfree(rbd_dev->header.snap_sizes);
3120         kfree(rbd_dev->header.snap_names);
3121         /* osd requests may still refer to snapc */
3122         ceph_put_snap_context(rbd_dev->header.snapc);
3123
3124         rbd_dev->header.image_size = h.image_size;
3125         rbd_dev->header.snapc = h.snapc;
3126         rbd_dev->header.snap_names = h.snap_names;
3127         rbd_dev->header.snap_sizes = h.snap_sizes;
3128         /* Free the extra copy of the object prefix */
3129         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3130                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3131         kfree(h.object_prefix);
3132
3133         up_write(&rbd_dev->header_rwsem);
3134
3135         return ret;
3136 }
3137
3138 /*
3139  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3140  * has disappeared from the (just updated) snapshot context.
3141  */
3142 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3143 {
3144         u64 snap_id;
3145
3146         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3147                 return;
3148
3149         snap_id = rbd_dev->spec->snap_id;
3150         if (snap_id == CEPH_NOSNAP)
3151                 return;
3152
3153         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3154                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3155 }
3156
3157 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3158 {
3159         u64 mapping_size;
3160         int ret;
3161
3162         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3163         mapping_size = rbd_dev->mapping.size;
3164         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3165         if (rbd_dev->image_format == 1)
3166                 ret = rbd_dev_v1_refresh(rbd_dev);
3167         else
3168                 ret = rbd_dev_v2_refresh(rbd_dev);
3169
3170         /* If it's a mapped snapshot, validate its EXISTS flag */
3171
3172         rbd_exists_validate(rbd_dev);
3173         mutex_unlock(&ctl_mutex);
3174         if (mapping_size != rbd_dev->mapping.size) {
3175                 sector_t size;
3176
3177                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3178                 dout("setting size to %llu sectors", (unsigned long long)size);
3179                 set_capacity(rbd_dev->disk, size);
3180                 revalidate_disk(rbd_dev->disk);
3181         }
3182
3183         return ret;
3184 }
3185
3186 static int rbd_init_disk(struct rbd_device *rbd_dev)
3187 {
3188         struct gendisk *disk;
3189         struct request_queue *q;
3190         u64 segment_size;
3191
3192         /* create gendisk info */
3193         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3194         if (!disk)
3195                 return -ENOMEM;
3196
3197         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3198                  rbd_dev->dev_id);
3199         disk->major = rbd_dev->major;
3200         disk->first_minor = 0;
3201         disk->fops = &rbd_bd_ops;
3202         disk->private_data = rbd_dev;
3203
3204         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3205         if (!q)
3206                 goto out_disk;
3207
3208         /* We use the default size, but let's be explicit about it. */
3209         blk_queue_physical_block_size(q, SECTOR_SIZE);
3210
3211         /* set io sizes to object size */
3212         segment_size = rbd_obj_bytes(&rbd_dev->header);
3213         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3214         blk_queue_max_segment_size(q, segment_size);
3215         blk_queue_io_min(q, segment_size);
3216         blk_queue_io_opt(q, segment_size);
3217
3218         blk_queue_merge_bvec(q, rbd_merge_bvec);
3219         disk->queue = q;
3220
3221         q->queuedata = rbd_dev;
3222
3223         rbd_dev->disk = disk;
3224
3225         return 0;
3226 out_disk:
3227         put_disk(disk);
3228
3229         return -ENOMEM;
3230 }
3231
3232 /*
3233   sysfs
3234 */
3235
3236 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3237 {
3238         return container_of(dev, struct rbd_device, dev);
3239 }
3240
3241 static ssize_t rbd_size_show(struct device *dev,
3242                              struct device_attribute *attr, char *buf)
3243 {
3244         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3245
3246         return sprintf(buf, "%llu\n",
3247                 (unsigned long long)rbd_dev->mapping.size);
3248 }
3249
3250 /*
3251  * Note this shows the features for whatever's mapped, which is not
3252  * necessarily the base image.
3253  */
3254 static ssize_t rbd_features_show(struct device *dev,
3255                              struct device_attribute *attr, char *buf)
3256 {
3257         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3258
3259         return sprintf(buf, "0x%016llx\n",
3260                         (unsigned long long)rbd_dev->mapping.features);
3261 }
3262
3263 static ssize_t rbd_major_show(struct device *dev,
3264                               struct device_attribute *attr, char *buf)
3265 {
3266         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3267
3268         if (rbd_dev->major)
3269                 return sprintf(buf, "%d\n", rbd_dev->major);
3270
3271         return sprintf(buf, "(none)\n");
3272
3273 }
3274
3275 static ssize_t rbd_client_id_show(struct device *dev,
3276                                   struct device_attribute *attr, char *buf)
3277 {
3278         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3279
3280         return sprintf(buf, "client%lld\n",
3281                         ceph_client_id(rbd_dev->rbd_client->client));
3282 }
3283
3284 static ssize_t rbd_pool_show(struct device *dev,
3285                              struct device_attribute *attr, char *buf)
3286 {
3287         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3288
3289         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3290 }
3291
3292 static ssize_t rbd_pool_id_show(struct device *dev,
3293                              struct device_attribute *attr, char *buf)
3294 {
3295         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3296
3297         return sprintf(buf, "%llu\n",
3298                         (unsigned long long) rbd_dev->spec->pool_id);
3299 }
3300
3301 static ssize_t rbd_name_show(struct device *dev,
3302                              struct device_attribute *attr, char *buf)
3303 {
3304         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3305
3306         if (rbd_dev->spec->image_name)
3307                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3308
3309         return sprintf(buf, "(unknown)\n");
3310 }
3311
3312 static ssize_t rbd_image_id_show(struct device *dev,
3313                              struct device_attribute *attr, char *buf)
3314 {
3315         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3316
3317         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3318 }
3319
3320 /*
3321  * Shows the name of the currently-mapped snapshot (or
3322  * RBD_SNAP_HEAD_NAME for the base image).
3323  */
3324 static ssize_t rbd_snap_show(struct device *dev,
3325                              struct device_attribute *attr,
3326                              char *buf)
3327 {
3328         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3329
3330         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3331 }
3332
3333 /*
3334  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3335  * for the parent image.  If there is no parent, simply shows
3336  * "(no parent image)".
3337  */
3338 static ssize_t rbd_parent_show(struct device *dev,
3339                              struct device_attribute *attr,
3340                              char *buf)
3341 {
3342         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3343         struct rbd_spec *spec = rbd_dev->parent_spec;
3344         int count;
3345         char *bufp = buf;
3346
3347         if (!spec)
3348                 return sprintf(buf, "(no parent image)\n");
3349
3350         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3351                         (unsigned long long) spec->pool_id, spec->pool_name);
3352         if (count < 0)
3353                 return count;
3354         bufp += count;
3355
3356         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3357                         spec->image_name ? spec->image_name : "(unknown)");
3358         if (count < 0)
3359                 return count;
3360         bufp += count;
3361
3362         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3363                         (unsigned long long) spec->snap_id, spec->snap_name);
3364         if (count < 0)
3365                 return count;
3366         bufp += count;
3367
3368         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3369         if (count < 0)
3370                 return count;
3371         bufp += count;
3372
3373         return (ssize_t) (bufp - buf);
3374 }
3375
3376 static ssize_t rbd_image_refresh(struct device *dev,
3377                                  struct device_attribute *attr,
3378                                  const char *buf,
3379                                  size_t size)
3380 {
3381         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3382         int ret;
3383
3384         ret = rbd_dev_refresh(rbd_dev);
3385         if (ret)
3386                 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3387
3388         return ret < 0 ? ret : size;
3389 }
3390
3391 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3392 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3393 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3394 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3395 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3396 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3397 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3398 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3399 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3400 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3401 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3402
3403 static struct attribute *rbd_attrs[] = {
3404         &dev_attr_size.attr,
3405         &dev_attr_features.attr,
3406         &dev_attr_major.attr,
3407         &dev_attr_client_id.attr,
3408         &dev_attr_pool.attr,
3409         &dev_attr_pool_id.attr,
3410         &dev_attr_name.attr,
3411         &dev_attr_image_id.attr,
3412         &dev_attr_current_snap.attr,
3413         &dev_attr_parent.attr,
3414         &dev_attr_refresh.attr,
3415         NULL
3416 };
3417
3418 static struct attribute_group rbd_attr_group = {
3419         .attrs = rbd_attrs,
3420 };
3421
3422 static const struct attribute_group *rbd_attr_groups[] = {
3423         &rbd_attr_group,
3424         NULL
3425 };
3426
3427 static void rbd_sysfs_dev_release(struct device *dev)
3428 {
3429 }
3430
3431 static struct device_type rbd_device_type = {
3432         .name           = "rbd",
3433         .groups         = rbd_attr_groups,
3434         .release        = rbd_sysfs_dev_release,
3435 };
3436
3437 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3438 {
3439         kref_get(&spec->kref);
3440
3441         return spec;
3442 }
3443
3444 static void rbd_spec_free(struct kref *kref);
3445 static void rbd_spec_put(struct rbd_spec *spec)
3446 {
3447         if (spec)
3448                 kref_put(&spec->kref, rbd_spec_free);
3449 }
3450
3451 static struct rbd_spec *rbd_spec_alloc(void)
3452 {
3453         struct rbd_spec *spec;
3454
3455         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3456         if (!spec)
3457                 return NULL;
3458         kref_init(&spec->kref);
3459
3460         return spec;
3461 }
3462
3463 static void rbd_spec_free(struct kref *kref)
3464 {
3465         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3466
3467         kfree(spec->pool_name);
3468         kfree(spec->image_id);
3469         kfree(spec->image_name);
3470         kfree(spec->snap_name);
3471         kfree(spec);
3472 }
3473
3474 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3475                                 struct rbd_spec *spec)
3476 {
3477         struct rbd_device *rbd_dev;
3478
3479         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3480         if (!rbd_dev)
3481                 return NULL;
3482
3483         spin_lock_init(&rbd_dev->lock);
3484         rbd_dev->flags = 0;
3485         INIT_LIST_HEAD(&rbd_dev->node);
3486         init_rwsem(&rbd_dev->header_rwsem);
3487
3488         rbd_dev->spec = spec;
3489         rbd_dev->rbd_client = rbdc;
3490
3491         /* Initialize the layout used for all rbd requests */
3492
3493         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3494         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3495         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3496         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3497
3498         return rbd_dev;
3499 }
3500
3501 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3502 {
3503         rbd_put_client(rbd_dev->rbd_client);
3504         rbd_spec_put(rbd_dev->spec);
3505         kfree(rbd_dev);
3506 }
3507
3508 /*
3509  * Get the size and object order for an image snapshot, or if
3510  * snap_id is CEPH_NOSNAP, gets this information for the base
3511  * image.
3512  */
3513 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3514                                 u8 *order, u64 *snap_size)
3515 {
3516         __le64 snapid = cpu_to_le64(snap_id);
3517         int ret;
3518         struct {
3519                 u8 order;
3520                 __le64 size;
3521         } __attribute__ ((packed)) size_buf = { 0 };
3522
3523         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3524                                 "rbd", "get_size",
3525                                 &snapid, sizeof (snapid),
3526                                 &size_buf, sizeof (size_buf));
3527         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3528         if (ret < 0)
3529                 return ret;
3530         if (ret < sizeof (size_buf))
3531                 return -ERANGE;
3532
3533         if (order)
3534                 *order = size_buf.order;
3535         *snap_size = le64_to_cpu(size_buf.size);
3536
3537         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3538                 (unsigned long long)snap_id, (unsigned int)*order,
3539                 (unsigned long long)*snap_size);
3540
3541         return 0;
3542 }
3543
3544 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3545 {
3546         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3547                                         &rbd_dev->header.obj_order,
3548                                         &rbd_dev->header.image_size);
3549 }
3550
3551 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3552 {
3553         void *reply_buf;
3554         int ret;
3555         void *p;
3556
3557         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3558         if (!reply_buf)
3559                 return -ENOMEM;
3560
3561         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3562                                 "rbd", "get_object_prefix", NULL, 0,
3563                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3564         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3565         if (ret < 0)
3566                 goto out;
3567
3568         p = reply_buf;
3569         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3570                                                 p + ret, NULL, GFP_NOIO);
3571         ret = 0;
3572
3573         if (IS_ERR(rbd_dev->header.object_prefix)) {
3574                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3575                 rbd_dev->header.object_prefix = NULL;
3576         } else {
3577                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3578         }
3579 out:
3580         kfree(reply_buf);
3581
3582         return ret;
3583 }
3584
3585 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3586                 u64 *snap_features)
3587 {
3588         __le64 snapid = cpu_to_le64(snap_id);
3589         struct {
3590                 __le64 features;
3591                 __le64 incompat;
3592         } __attribute__ ((packed)) features_buf = { 0 };
3593         u64 incompat;
3594         int ret;
3595
3596         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3597                                 "rbd", "get_features",
3598                                 &snapid, sizeof (snapid),
3599                                 &features_buf, sizeof (features_buf));
3600         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3601         if (ret < 0)
3602                 return ret;
3603         if (ret < sizeof (features_buf))
3604                 return -ERANGE;
3605
3606         incompat = le64_to_cpu(features_buf.incompat);
3607         if (incompat & ~RBD_FEATURES_SUPPORTED)
3608                 return -ENXIO;
3609
3610         *snap_features = le64_to_cpu(features_buf.features);
3611
3612         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3613                 (unsigned long long)snap_id,
3614                 (unsigned long long)*snap_features,
3615                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3616
3617         return 0;
3618 }
3619
3620 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3621 {
3622         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3623                                                 &rbd_dev->header.features);
3624 }
3625
3626 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3627 {
3628         struct rbd_spec *parent_spec;
3629         size_t size;
3630         void *reply_buf = NULL;
3631         __le64 snapid;
3632         void *p;
3633         void *end;
3634         char *image_id;
3635         u64 overlap;
3636         int ret;
3637
3638         parent_spec = rbd_spec_alloc();
3639         if (!parent_spec)
3640                 return -ENOMEM;
3641
3642         size = sizeof (__le64) +                                /* pool_id */
3643                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3644                 sizeof (__le64) +                               /* snap_id */
3645                 sizeof (__le64);                                /* overlap */
3646         reply_buf = kmalloc(size, GFP_KERNEL);
3647         if (!reply_buf) {
3648                 ret = -ENOMEM;
3649                 goto out_err;
3650         }
3651
3652         snapid = cpu_to_le64(CEPH_NOSNAP);
3653         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3654                                 "rbd", "get_parent",
3655                                 &snapid, sizeof (snapid),
3656                                 reply_buf, size);
3657         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3658         if (ret < 0)
3659                 goto out_err;
3660
3661         p = reply_buf;
3662         end = reply_buf + ret;
3663         ret = -ERANGE;
3664         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3665         if (parent_spec->pool_id == CEPH_NOPOOL)
3666                 goto out;       /* No parent?  No problem. */
3667
3668         /* The ceph file layout needs to fit pool id in 32 bits */
3669
3670         ret = -EIO;
3671         if (parent_spec->pool_id > (u64)U32_MAX) {
3672                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3673                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3674                 goto out_err;
3675         }
3676
3677         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3678         if (IS_ERR(image_id)) {
3679                 ret = PTR_ERR(image_id);
3680                 goto out_err;
3681         }
3682         parent_spec->image_id = image_id;
3683         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3684         ceph_decode_64_safe(&p, end, overlap, out_err);
3685
3686         rbd_dev->parent_overlap = overlap;
3687         rbd_dev->parent_spec = parent_spec;
3688         parent_spec = NULL;     /* rbd_dev now owns this */
3689 out:
3690         ret = 0;
3691 out_err:
3692         kfree(reply_buf);
3693         rbd_spec_put(parent_spec);
3694
3695         return ret;
3696 }
3697
3698 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3699 {
3700         struct {
3701                 __le64 stripe_unit;
3702                 __le64 stripe_count;
3703         } __attribute__ ((packed)) striping_info_buf = { 0 };
3704         size_t size = sizeof (striping_info_buf);
3705         void *p;
3706         u64 obj_size;
3707         u64 stripe_unit;
3708         u64 stripe_count;
3709         int ret;
3710
3711         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3712                                 "rbd", "get_stripe_unit_count", NULL, 0,
3713                                 (char *)&striping_info_buf, size);
3714         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3715         if (ret < 0)
3716                 return ret;
3717         if (ret < size)
3718                 return -ERANGE;
3719
3720         /*
3721          * We don't actually support the "fancy striping" feature
3722          * (STRIPINGV2) yet, but if the striping sizes are the
3723          * defaults the behavior is the same as before.  So find
3724          * out, and only fail if the image has non-default values.
3725          */
3726         ret = -EINVAL;
3727         obj_size = (u64)1 << rbd_dev->header.obj_order;
3728         p = &striping_info_buf;
3729         stripe_unit = ceph_decode_64(&p);
3730         if (stripe_unit != obj_size) {
3731                 rbd_warn(rbd_dev, "unsupported stripe unit "
3732                                 "(got %llu want %llu)",
3733                                 stripe_unit, obj_size);
3734                 return -EINVAL;
3735         }
3736         stripe_count = ceph_decode_64(&p);
3737         if (stripe_count != 1) {
3738                 rbd_warn(rbd_dev, "unsupported stripe count "
3739                                 "(got %llu want 1)", stripe_count);
3740                 return -EINVAL;
3741         }
3742         rbd_dev->header.stripe_unit = stripe_unit;
3743         rbd_dev->header.stripe_count = stripe_count;
3744
3745         return 0;
3746 }
3747
3748 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3749 {
3750         size_t image_id_size;
3751         char *image_id;
3752         void *p;
3753         void *end;
3754         size_t size;
3755         void *reply_buf = NULL;
3756         size_t len = 0;
3757         char *image_name = NULL;
3758         int ret;
3759
3760         rbd_assert(!rbd_dev->spec->image_name);
3761
3762         len = strlen(rbd_dev->spec->image_id);
3763         image_id_size = sizeof (__le32) + len;
3764         image_id = kmalloc(image_id_size, GFP_KERNEL);
3765         if (!image_id)
3766                 return NULL;
3767
3768         p = image_id;
3769         end = image_id + image_id_size;
3770         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3771
3772         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3773         reply_buf = kmalloc(size, GFP_KERNEL);
3774         if (!reply_buf)
3775                 goto out;
3776
3777         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3778                                 "rbd", "dir_get_name",
3779                                 image_id, image_id_size,
3780                                 reply_buf, size);
3781         if (ret < 0)
3782                 goto out;
3783         p = reply_buf;
3784         end = reply_buf + ret;
3785
3786         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3787         if (IS_ERR(image_name))
3788                 image_name = NULL;
3789         else
3790                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3791 out:
3792         kfree(reply_buf);
3793         kfree(image_id);
3794
3795         return image_name;
3796 }
3797
3798 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3799 {
3800         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3801         const char *snap_name;
3802         u32 which = 0;
3803
3804         /* Skip over names until we find the one we are looking for */
3805
3806         snap_name = rbd_dev->header.snap_names;
3807         while (which < snapc->num_snaps) {
3808                 if (!strcmp(name, snap_name))
3809                         return snapc->snaps[which];
3810                 snap_name += strlen(snap_name) + 1;
3811                 which++;
3812         }
3813         return CEPH_NOSNAP;
3814 }
3815
3816 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3817 {
3818         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3819         u32 which;
3820         bool found = false;
3821         u64 snap_id;
3822
3823         for (which = 0; !found && which < snapc->num_snaps; which++) {
3824                 const char *snap_name;
3825
3826                 snap_id = snapc->snaps[which];
3827                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3828                 if (IS_ERR(snap_name))
3829                         break;
3830                 found = !strcmp(name, snap_name);
3831                 kfree(snap_name);
3832         }
3833         return found ? snap_id : CEPH_NOSNAP;
3834 }
3835
3836 /*
3837  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3838  * no snapshot by that name is found, or if an error occurs.
3839  */
3840 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3841 {
3842         if (rbd_dev->image_format == 1)
3843                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3844
3845         return rbd_v2_snap_id_by_name(rbd_dev, name);
3846 }
3847
3848 /*
3849  * When an rbd image has a parent image, it is identified by the
3850  * pool, image, and snapshot ids (not names).  This function fills
3851  * in the names for those ids.  (It's OK if we can't figure out the
3852  * name for an image id, but the pool and snapshot ids should always
3853  * exist and have names.)  All names in an rbd spec are dynamically
3854  * allocated.
3855  *
3856  * When an image being mapped (not a parent) is probed, we have the
3857  * pool name and pool id, image name and image id, and the snapshot
3858  * name.  The only thing we're missing is the snapshot id.
3859  */
3860 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3861 {
3862         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3863         struct rbd_spec *spec = rbd_dev->spec;
3864         const char *pool_name;
3865         const char *image_name;
3866         const char *snap_name;
3867         int ret;
3868
3869         /*
3870          * An image being mapped will have the pool name (etc.), but
3871          * we need to look up the snapshot id.
3872          */
3873         if (spec->pool_name) {
3874                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3875                         u64 snap_id;
3876
3877                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3878                         if (snap_id == CEPH_NOSNAP)
3879                                 return -ENOENT;
3880                         spec->snap_id = snap_id;
3881                 } else {
3882                         spec->snap_id = CEPH_NOSNAP;
3883                 }
3884
3885                 return 0;
3886         }
3887
3888         /* Get the pool name; we have to make our own copy of this */
3889
3890         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3891         if (!pool_name) {
3892                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3893                 return -EIO;
3894         }
3895         pool_name = kstrdup(pool_name, GFP_KERNEL);
3896         if (!pool_name)
3897                 return -ENOMEM;
3898
3899         /* Fetch the image name; tolerate failure here */
3900
3901         image_name = rbd_dev_image_name(rbd_dev);
3902         if (!image_name)
3903                 rbd_warn(rbd_dev, "unable to get image name");
3904
3905         /* Look up the snapshot name, and make a copy */
3906
3907         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3908         if (!snap_name) {
3909                 ret = -ENOMEM;
3910                 goto out_err;
3911         }
3912
3913         spec->pool_name = pool_name;
3914         spec->image_name = image_name;
3915         spec->snap_name = snap_name;
3916
3917         return 0;
3918 out_err:
3919         kfree(image_name);
3920         kfree(pool_name);
3921
3922         return ret;
3923 }
3924
3925 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3926 {
3927         size_t size;
3928         int ret;
3929         void *reply_buf;
3930         void *p;
3931         void *end;
3932         u64 seq;
3933         u32 snap_count;
3934         struct ceph_snap_context *snapc;
3935         u32 i;
3936
3937         /*
3938          * We'll need room for the seq value (maximum snapshot id),
3939          * snapshot count, and array of that many snapshot ids.
3940          * For now we have a fixed upper limit on the number we're
3941          * prepared to receive.
3942          */
3943         size = sizeof (__le64) + sizeof (__le32) +
3944                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3945         reply_buf = kzalloc(size, GFP_KERNEL);
3946         if (!reply_buf)
3947                 return -ENOMEM;
3948
3949         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3950                                 "rbd", "get_snapcontext", NULL, 0,
3951                                 reply_buf, size);
3952         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3953         if (ret < 0)
3954                 goto out;
3955
3956         p = reply_buf;
3957         end = reply_buf + ret;
3958         ret = -ERANGE;
3959         ceph_decode_64_safe(&p, end, seq, out);
3960         ceph_decode_32_safe(&p, end, snap_count, out);
3961
3962         /*
3963          * Make sure the reported number of snapshot ids wouldn't go
3964          * beyond the end of our buffer.  But before checking that,
3965          * make sure the computed size of the snapshot context we
3966          * allocate is representable in a size_t.
3967          */
3968         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3969                                  / sizeof (u64)) {
3970                 ret = -EINVAL;
3971                 goto out;
3972         }
3973         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3974                 goto out;
3975         ret = 0;
3976
3977         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3978         if (!snapc) {
3979                 ret = -ENOMEM;
3980                 goto out;
3981         }
3982         snapc->seq = seq;
3983         for (i = 0; i < snap_count; i++)
3984                 snapc->snaps[i] = ceph_decode_64(&p);
3985
3986         ceph_put_snap_context(rbd_dev->header.snapc);
3987         rbd_dev->header.snapc = snapc;
3988
3989         dout("  snap context seq = %llu, snap_count = %u\n",
3990                 (unsigned long long)seq, (unsigned int)snap_count);
3991 out:
3992         kfree(reply_buf);
3993
3994         return ret;
3995 }
3996
3997 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3998                                         u64 snap_id)
3999 {
4000         size_t size;
4001         void *reply_buf;
4002         __le64 snapid;
4003         int ret;
4004         void *p;
4005         void *end;
4006         char *snap_name;
4007
4008         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4009         reply_buf = kmalloc(size, GFP_KERNEL);
4010         if (!reply_buf)
4011                 return ERR_PTR(-ENOMEM);
4012
4013         snapid = cpu_to_le64(snap_id);
4014         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4015                                 "rbd", "get_snapshot_name",
4016                                 &snapid, sizeof (snapid),
4017                                 reply_buf, size);
4018         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4019         if (ret < 0) {
4020                 snap_name = ERR_PTR(ret);
4021                 goto out;
4022         }
4023
4024         p = reply_buf;
4025         end = reply_buf + ret;
4026         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4027         if (IS_ERR(snap_name))
4028                 goto out;
4029
4030         dout("  snap_id 0x%016llx snap_name = %s\n",
4031                 (unsigned long long)snap_id, snap_name);
4032 out:
4033         kfree(reply_buf);
4034
4035         return snap_name;
4036 }
4037
4038 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4039 {
4040         int ret;
4041
4042         down_write(&rbd_dev->header_rwsem);
4043
4044         ret = rbd_dev_v2_image_size(rbd_dev);
4045         if (ret)
4046                 goto out;
4047         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4048                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4049                         rbd_dev->mapping.size = rbd_dev->header.image_size;
4050
4051         ret = rbd_dev_v2_snap_context(rbd_dev);
4052         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4053         if (ret)
4054                 goto out;
4055 out:
4056         up_write(&rbd_dev->header_rwsem);
4057
4058         return ret;
4059 }
4060
4061 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4062 {
4063         struct device *dev;
4064         int ret;
4065
4066         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4067
4068         dev = &rbd_dev->dev;
4069         dev->bus = &rbd_bus_type;
4070         dev->type = &rbd_device_type;
4071         dev->parent = &rbd_root_dev;
4072         dev->release = rbd_dev_device_release;
4073         dev_set_name(dev, "%d", rbd_dev->dev_id);
4074         ret = device_register(dev);
4075
4076         mutex_unlock(&ctl_mutex);
4077
4078         return ret;
4079 }
4080
4081 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4082 {
4083         device_unregister(&rbd_dev->dev);
4084 }
4085
4086 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4087
4088 /*
4089  * Get a unique rbd identifier for the given new rbd_dev, and add
4090  * the rbd_dev to the global list.  The minimum rbd id is 1.
4091  */
4092 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4093 {
4094         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4095
4096         spin_lock(&rbd_dev_list_lock);
4097         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4098         spin_unlock(&rbd_dev_list_lock);
4099         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4100                 (unsigned long long) rbd_dev->dev_id);
4101 }
4102
4103 /*
4104  * Remove an rbd_dev from the global list, and record that its
4105  * identifier is no longer in use.
4106  */
4107 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4108 {
4109         struct list_head *tmp;
4110         int rbd_id = rbd_dev->dev_id;
4111         int max_id;
4112
4113         rbd_assert(rbd_id > 0);
4114
4115         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4116                 (unsigned long long) rbd_dev->dev_id);
4117         spin_lock(&rbd_dev_list_lock);
4118         list_del_init(&rbd_dev->node);
4119
4120         /*
4121          * If the id being "put" is not the current maximum, there
4122          * is nothing special we need to do.
4123          */
4124         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4125                 spin_unlock(&rbd_dev_list_lock);
4126                 return;
4127         }
4128
4129         /*
4130          * We need to update the current maximum id.  Search the
4131          * list to find out what it is.  We're more likely to find
4132          * the maximum at the end, so search the list backward.
4133          */
4134         max_id = 0;
4135         list_for_each_prev(tmp, &rbd_dev_list) {
4136                 struct rbd_device *rbd_dev;
4137
4138                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4139                 if (rbd_dev->dev_id > max_id)
4140                         max_id = rbd_dev->dev_id;
4141         }
4142         spin_unlock(&rbd_dev_list_lock);
4143
4144         /*
4145          * The max id could have been updated by rbd_dev_id_get(), in
4146          * which case it now accurately reflects the new maximum.
4147          * Be careful not to overwrite the maximum value in that
4148          * case.
4149          */
4150         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4151         dout("  max dev id has been reset\n");
4152 }
4153
4154 /*
4155  * Skips over white space at *buf, and updates *buf to point to the
4156  * first found non-space character (if any). Returns the length of
4157  * the token (string of non-white space characters) found.  Note
4158  * that *buf must be terminated with '\0'.
4159  */
4160 static inline size_t next_token(const char **buf)
4161 {
4162         /*
4163         * These are the characters that produce nonzero for
4164         * isspace() in the "C" and "POSIX" locales.
4165         */
4166         const char *spaces = " \f\n\r\t\v";
4167
4168         *buf += strspn(*buf, spaces);   /* Find start of token */
4169
4170         return strcspn(*buf, spaces);   /* Return token length */
4171 }
4172
4173 /*
4174  * Finds the next token in *buf, and if the provided token buffer is
4175  * big enough, copies the found token into it.  The result, if
4176  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4177  * must be terminated with '\0' on entry.
4178  *
4179  * Returns the length of the token found (not including the '\0').
4180  * Return value will be 0 if no token is found, and it will be >=
4181  * token_size if the token would not fit.
4182  *
4183  * The *buf pointer will be updated to point beyond the end of the
4184  * found token.  Note that this occurs even if the token buffer is
4185  * too small to hold it.
4186  */
4187 static inline size_t copy_token(const char **buf,
4188                                 char *token,
4189                                 size_t token_size)
4190 {
4191         size_t len;
4192
4193         len = next_token(buf);
4194         if (len < token_size) {
4195                 memcpy(token, *buf, len);
4196                 *(token + len) = '\0';
4197         }
4198         *buf += len;
4199
4200         return len;
4201 }
4202
4203 /*
4204  * Finds the next token in *buf, dynamically allocates a buffer big
4205  * enough to hold a copy of it, and copies the token into the new
4206  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4207  * that a duplicate buffer is created even for a zero-length token.
4208  *
4209  * Returns a pointer to the newly-allocated duplicate, or a null
4210  * pointer if memory for the duplicate was not available.  If
4211  * the lenp argument is a non-null pointer, the length of the token
4212  * (not including the '\0') is returned in *lenp.
4213  *
4214  * If successful, the *buf pointer will be updated to point beyond
4215  * the end of the found token.
4216  *
4217  * Note: uses GFP_KERNEL for allocation.
4218  */
4219 static inline char *dup_token(const char **buf, size_t *lenp)
4220 {
4221         char *dup;
4222         size_t len;
4223
4224         len = next_token(buf);
4225         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4226         if (!dup)
4227                 return NULL;
4228         *(dup + len) = '\0';
4229         *buf += len;
4230
4231         if (lenp)
4232                 *lenp = len;
4233
4234         return dup;
4235 }
4236
4237 /*
4238  * Parse the options provided for an "rbd add" (i.e., rbd image
4239  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4240  * and the data written is passed here via a NUL-terminated buffer.
4241  * Returns 0 if successful or an error code otherwise.
4242  *
4243  * The information extracted from these options is recorded in
4244  * the other parameters which return dynamically-allocated
4245  * structures:
4246  *  ceph_opts
4247  *      The address of a pointer that will refer to a ceph options
4248  *      structure.  Caller must release the returned pointer using
4249  *      ceph_destroy_options() when it is no longer needed.
4250  *  rbd_opts
4251  *      Address of an rbd options pointer.  Fully initialized by
4252  *      this function; caller must release with kfree().
4253  *  spec
4254  *      Address of an rbd image specification pointer.  Fully
4255  *      initialized by this function based on parsed options.
4256  *      Caller must release with rbd_spec_put().
4257  *
4258  * The options passed take this form:
4259  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4260  * where:
4261  *  <mon_addrs>
4262  *      A comma-separated list of one or more monitor addresses.
4263  *      A monitor address is an ip address, optionally followed
4264  *      by a port number (separated by a colon).
4265  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4266  *  <options>
4267  *      A comma-separated list of ceph and/or rbd options.
4268  *  <pool_name>
4269  *      The name of the rados pool containing the rbd image.
4270  *  <image_name>
4271  *      The name of the image in that pool to map.
4272  *  <snap_id>
4273  *      An optional snapshot id.  If provided, the mapping will
4274  *      present data from the image at the time that snapshot was
4275  *      created.  The image head is used if no snapshot id is
4276  *      provided.  Snapshot mappings are always read-only.
4277  */
4278 static int rbd_add_parse_args(const char *buf,
4279                                 struct ceph_options **ceph_opts,
4280                                 struct rbd_options **opts,
4281                                 struct rbd_spec **rbd_spec)
4282 {
4283         size_t len;
4284         char *options;
4285         const char *mon_addrs;
4286         char *snap_name;
4287         size_t mon_addrs_size;
4288         struct rbd_spec *spec = NULL;
4289         struct rbd_options *rbd_opts = NULL;
4290         struct ceph_options *copts;
4291         int ret;
4292
4293         /* The first four tokens are required */
4294
4295         len = next_token(&buf);
4296         if (!len) {
4297                 rbd_warn(NULL, "no monitor address(es) provided");
4298                 return -EINVAL;
4299         }
4300         mon_addrs = buf;
4301         mon_addrs_size = len + 1;
4302         buf += len;
4303
4304         ret = -EINVAL;
4305         options = dup_token(&buf, NULL);
4306         if (!options)
4307                 return -ENOMEM;
4308         if (!*options) {
4309                 rbd_warn(NULL, "no options provided");
4310                 goto out_err;
4311         }
4312
4313         spec = rbd_spec_alloc();
4314         if (!spec)
4315                 goto out_mem;
4316
4317         spec->pool_name = dup_token(&buf, NULL);
4318         if (!spec->pool_name)
4319                 goto out_mem;
4320         if (!*spec->pool_name) {
4321                 rbd_warn(NULL, "no pool name provided");
4322                 goto out_err;
4323         }
4324
4325         spec->image_name = dup_token(&buf, NULL);
4326         if (!spec->image_name)
4327                 goto out_mem;
4328         if (!*spec->image_name) {
4329                 rbd_warn(NULL, "no image name provided");
4330                 goto out_err;
4331         }
4332
4333         /*
4334          * Snapshot name is optional; default is to use "-"
4335          * (indicating the head/no snapshot).
4336          */
4337         len = next_token(&buf);
4338         if (!len) {
4339                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4340                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4341         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4342                 ret = -ENAMETOOLONG;
4343                 goto out_err;
4344         }
4345         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4346         if (!snap_name)
4347                 goto out_mem;
4348         *(snap_name + len) = '\0';
4349         spec->snap_name = snap_name;
4350
4351         /* Initialize all rbd options to the defaults */
4352
4353         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4354         if (!rbd_opts)
4355                 goto out_mem;
4356
4357         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4358
4359         copts = ceph_parse_options(options, mon_addrs,
4360                                         mon_addrs + mon_addrs_size - 1,
4361                                         parse_rbd_opts_token, rbd_opts);
4362         if (IS_ERR(copts)) {
4363                 ret = PTR_ERR(copts);
4364                 goto out_err;
4365         }
4366         kfree(options);
4367
4368         *ceph_opts = copts;
4369         *opts = rbd_opts;
4370         *rbd_spec = spec;
4371
4372         return 0;
4373 out_mem:
4374         ret = -ENOMEM;
4375 out_err:
4376         kfree(rbd_opts);
4377         rbd_spec_put(spec);
4378         kfree(options);
4379
4380         return ret;
4381 }
4382
4383 /*
4384  * An rbd format 2 image has a unique identifier, distinct from the
4385  * name given to it by the user.  Internally, that identifier is
4386  * what's used to specify the names of objects related to the image.
4387  *
4388  * A special "rbd id" object is used to map an rbd image name to its
4389  * id.  If that object doesn't exist, then there is no v2 rbd image
4390  * with the supplied name.
4391  *
4392  * This function will record the given rbd_dev's image_id field if
4393  * it can be determined, and in that case will return 0.  If any
4394  * errors occur a negative errno will be returned and the rbd_dev's
4395  * image_id field will be unchanged (and should be NULL).
4396  */
4397 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4398 {
4399         int ret;
4400         size_t size;
4401         char *object_name;
4402         void *response;
4403         char *image_id;
4404
4405         /*
4406          * When probing a parent image, the image id is already
4407          * known (and the image name likely is not).  There's no
4408          * need to fetch the image id again in this case.  We
4409          * do still need to set the image format though.
4410          */
4411         if (rbd_dev->spec->image_id) {
4412                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4413
4414                 return 0;
4415         }
4416
4417         /*
4418          * First, see if the format 2 image id file exists, and if
4419          * so, get the image's persistent id from it.
4420          */
4421         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4422         object_name = kmalloc(size, GFP_NOIO);
4423         if (!object_name)
4424                 return -ENOMEM;
4425         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4426         dout("rbd id object name is %s\n", object_name);
4427
4428         /* Response will be an encoded string, which includes a length */
4429
4430         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4431         response = kzalloc(size, GFP_NOIO);
4432         if (!response) {
4433                 ret = -ENOMEM;
4434                 goto out;
4435         }
4436
4437         /* If it doesn't exist we'll assume it's a format 1 image */
4438
4439         ret = rbd_obj_method_sync(rbd_dev, object_name,
4440                                 "rbd", "get_id", NULL, 0,
4441                                 response, RBD_IMAGE_ID_LEN_MAX);
4442         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4443         if (ret == -ENOENT) {
4444                 image_id = kstrdup("", GFP_KERNEL);
4445                 ret = image_id ? 0 : -ENOMEM;
4446                 if (!ret)
4447                         rbd_dev->image_format = 1;
4448         } else if (ret > sizeof (__le32)) {
4449                 void *p = response;
4450
4451                 image_id = ceph_extract_encoded_string(&p, p + ret,
4452                                                 NULL, GFP_NOIO);
4453                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4454                 if (!ret)
4455                         rbd_dev->image_format = 2;
4456         } else {
4457                 ret = -EINVAL;
4458         }
4459
4460         if (!ret) {
4461                 rbd_dev->spec->image_id = image_id;
4462                 dout("image_id is %s\n", image_id);
4463         }
4464 out:
4465         kfree(response);
4466         kfree(object_name);
4467
4468         return ret;
4469 }
4470
4471 /* Undo whatever state changes are made by v1 or v2 image probe */
4472
4473 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4474 {
4475         struct rbd_image_header *header;
4476
4477         rbd_dev_remove_parent(rbd_dev);
4478         rbd_spec_put(rbd_dev->parent_spec);
4479         rbd_dev->parent_spec = NULL;
4480         rbd_dev->parent_overlap = 0;
4481
4482         /* Free dynamic fields from the header, then zero it out */
4483
4484         header = &rbd_dev->header;
4485         ceph_put_snap_context(header->snapc);
4486         kfree(header->snap_sizes);
4487         kfree(header->snap_names);
4488         kfree(header->object_prefix);
4489         memset(header, 0, sizeof (*header));
4490 }
4491
4492 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4493 {
4494         int ret;
4495
4496         /* Populate rbd image metadata */
4497
4498         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4499         if (ret < 0)
4500                 goto out_err;
4501
4502         /* Version 1 images have no parent (no layering) */
4503
4504         rbd_dev->parent_spec = NULL;
4505         rbd_dev->parent_overlap = 0;
4506
4507         dout("discovered version 1 image, header name is %s\n",
4508                 rbd_dev->header_name);
4509
4510         return 0;
4511
4512 out_err:
4513         kfree(rbd_dev->header_name);
4514         rbd_dev->header_name = NULL;
4515         kfree(rbd_dev->spec->image_id);
4516         rbd_dev->spec->image_id = NULL;
4517
4518         return ret;
4519 }
4520
4521 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4522 {
4523         int ret;
4524
4525         ret = rbd_dev_v2_image_size(rbd_dev);
4526         if (ret)
4527                 goto out_err;
4528
4529         /* Get the object prefix (a.k.a. block_name) for the image */
4530
4531         ret = rbd_dev_v2_object_prefix(rbd_dev);
4532         if (ret)
4533                 goto out_err;
4534
4535         /* Get the and check features for the image */
4536
4537         ret = rbd_dev_v2_features(rbd_dev);
4538         if (ret)
4539                 goto out_err;
4540
4541         /* If the image supports layering, get the parent info */
4542
4543         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4544                 ret = rbd_dev_v2_parent_info(rbd_dev);
4545                 if (ret)
4546                         goto out_err;
4547                 /*
4548                  * Print a warning if this image has a parent.
4549                  * Don't print it if the image now being probed
4550                  * is itself a parent.  We can tell at this point
4551                  * because we won't know its pool name yet (just its
4552                  * pool id).
4553                  */
4554                 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
4555                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4556                                         "is EXPERIMENTAL!");
4557         }
4558
4559         /* If the image supports fancy striping, get its parameters */
4560
4561         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4562                 ret = rbd_dev_v2_striping_info(rbd_dev);
4563                 if (ret < 0)
4564                         goto out_err;
4565         }
4566
4567         /* crypto and compression type aren't (yet) supported for v2 images */
4568
4569         rbd_dev->header.crypt_type = 0;
4570         rbd_dev->header.comp_type = 0;
4571
4572         /* Get the snapshot context, plus the header version */
4573
4574         ret = rbd_dev_v2_snap_context(rbd_dev);
4575         if (ret)
4576                 goto out_err;
4577
4578         dout("discovered version 2 image, header name is %s\n",
4579                 rbd_dev->header_name);
4580
4581         return 0;
4582 out_err:
4583         rbd_dev->parent_overlap = 0;
4584         rbd_spec_put(rbd_dev->parent_spec);
4585         rbd_dev->parent_spec = NULL;
4586         kfree(rbd_dev->header_name);
4587         rbd_dev->header_name = NULL;
4588         kfree(rbd_dev->header.object_prefix);
4589         rbd_dev->header.object_prefix = NULL;
4590
4591         return ret;
4592 }
4593
4594 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4595 {
4596         struct rbd_device *parent = NULL;
4597         struct rbd_spec *parent_spec;
4598         struct rbd_client *rbdc;
4599         int ret;
4600
4601         if (!rbd_dev->parent_spec)
4602                 return 0;
4603         /*
4604          * We need to pass a reference to the client and the parent
4605          * spec when creating the parent rbd_dev.  Images related by
4606          * parent/child relationships always share both.
4607          */
4608         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4609         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4610
4611         ret = -ENOMEM;
4612         parent = rbd_dev_create(rbdc, parent_spec);
4613         if (!parent)
4614                 goto out_err;
4615
4616         ret = rbd_dev_image_probe(parent, true);
4617         if (ret < 0)
4618                 goto out_err;
4619         rbd_dev->parent = parent;
4620
4621         return 0;
4622 out_err:
4623         if (parent) {
4624                 rbd_spec_put(rbd_dev->parent_spec);
4625                 kfree(rbd_dev->header_name);
4626                 rbd_dev_destroy(parent);
4627         } else {
4628                 rbd_put_client(rbdc);
4629                 rbd_spec_put(parent_spec);
4630         }
4631
4632         return ret;
4633 }
4634
4635 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4636 {
4637         int ret;
4638
4639         /* generate unique id: find highest unique id, add one */
4640         rbd_dev_id_get(rbd_dev);
4641
4642         /* Fill in the device name, now that we have its id. */
4643         BUILD_BUG_ON(DEV_NAME_LEN
4644                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4645         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4646
4647         /* Get our block major device number. */
4648
4649         ret = register_blkdev(0, rbd_dev->name);
4650         if (ret < 0)
4651                 goto err_out_id;
4652         rbd_dev->major = ret;
4653
4654         /* Set up the blkdev mapping. */
4655
4656         ret = rbd_init_disk(rbd_dev);
4657         if (ret)
4658                 goto err_out_blkdev;
4659
4660         ret = rbd_dev_mapping_set(rbd_dev);
4661         if (ret)
4662                 goto err_out_disk;
4663         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4664
4665         ret = rbd_bus_add_dev(rbd_dev);
4666         if (ret)
4667                 goto err_out_mapping;
4668
4669         /* Everything's ready.  Announce the disk to the world. */
4670
4671         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4672         add_disk(rbd_dev->disk);
4673
4674         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4675                 (unsigned long long) rbd_dev->mapping.size);
4676
4677         return ret;
4678
4679 err_out_mapping:
4680         rbd_dev_mapping_clear(rbd_dev);
4681 err_out_disk:
4682         rbd_free_disk(rbd_dev);
4683 err_out_blkdev:
4684         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4685 err_out_id:
4686         rbd_dev_id_put(rbd_dev);
4687         rbd_dev_mapping_clear(rbd_dev);
4688
4689         return ret;
4690 }
4691
4692 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4693 {
4694         struct rbd_spec *spec = rbd_dev->spec;
4695         size_t size;
4696
4697         /* Record the header object name for this rbd image. */
4698
4699         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4700
4701         if (rbd_dev->image_format == 1)
4702                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4703         else
4704                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4705
4706         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4707         if (!rbd_dev->header_name)
4708                 return -ENOMEM;
4709
4710         if (rbd_dev->image_format == 1)
4711                 sprintf(rbd_dev->header_name, "%s%s",
4712                         spec->image_name, RBD_SUFFIX);
4713         else
4714                 sprintf(rbd_dev->header_name, "%s%s",
4715                         RBD_HEADER_PREFIX, spec->image_id);
4716         return 0;
4717 }
4718
4719 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4720 {
4721         int ret;
4722
4723         rbd_dev_unprobe(rbd_dev);
4724         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4725         if (ret)
4726                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4727         kfree(rbd_dev->header_name);
4728         rbd_dev->header_name = NULL;
4729         rbd_dev->image_format = 0;
4730         kfree(rbd_dev->spec->image_id);
4731         rbd_dev->spec->image_id = NULL;
4732
4733         rbd_dev_destroy(rbd_dev);
4734 }
4735
4736 /*
4737  * Probe for the existence of the header object for the given rbd
4738  * device.  For format 2 images this includes determining the image
4739  * id.
4740  */
4741 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only)
4742 {
4743         int ret;
4744         int tmp;
4745
4746         /*
4747          * Get the id from the image id object.  If it's not a
4748          * format 2 image, we'll get ENOENT back, and we'll assume
4749          * it's a format 1 image.
4750          */
4751         ret = rbd_dev_image_id(rbd_dev);
4752         if (ret)
4753                 return ret;
4754         rbd_assert(rbd_dev->spec->image_id);
4755         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4756
4757         ret = rbd_dev_header_name(rbd_dev);
4758         if (ret)
4759                 goto err_out_format;
4760
4761         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4762         if (ret)
4763                 goto out_header_name;
4764
4765         if (rbd_dev->image_format == 1)
4766                 ret = rbd_dev_v1_probe(rbd_dev);
4767         else
4768                 ret = rbd_dev_v2_probe(rbd_dev);
4769         if (ret)
4770                 goto err_out_watch;
4771
4772         ret = rbd_dev_spec_update(rbd_dev);
4773         if (ret)
4774                 goto err_out_probe;
4775
4776         /* If we are mapping a snapshot it must be marked read-only */
4777
4778         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4779                 read_only = true;
4780         rbd_dev->mapping.read_only = read_only;
4781
4782         ret = rbd_dev_probe_parent(rbd_dev);
4783         if (!ret)
4784                 return 0;
4785
4786 err_out_probe:
4787         rbd_dev_unprobe(rbd_dev);
4788 err_out_watch:
4789         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4790         if (tmp)
4791                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4792 out_header_name:
4793         kfree(rbd_dev->header_name);
4794         rbd_dev->header_name = NULL;
4795 err_out_format:
4796         rbd_dev->image_format = 0;
4797         kfree(rbd_dev->spec->image_id);
4798         rbd_dev->spec->image_id = NULL;
4799
4800         dout("probe failed, returning %d\n", ret);
4801
4802         return ret;
4803 }
4804
4805 static ssize_t rbd_add(struct bus_type *bus,
4806                        const char *buf,
4807                        size_t count)
4808 {
4809         struct rbd_device *rbd_dev = NULL;
4810         struct ceph_options *ceph_opts = NULL;
4811         struct rbd_options *rbd_opts = NULL;
4812         struct rbd_spec *spec = NULL;
4813         struct rbd_client *rbdc;
4814         struct ceph_osd_client *osdc;
4815         bool read_only;
4816         int rc = -ENOMEM;
4817
4818         if (!try_module_get(THIS_MODULE))
4819                 return -ENODEV;
4820
4821         /* parse add command */
4822         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4823         if (rc < 0)
4824                 goto err_out_module;
4825         read_only = rbd_opts->read_only;
4826         kfree(rbd_opts);
4827         rbd_opts = NULL;        /* done with this */
4828
4829         rbdc = rbd_get_client(ceph_opts);
4830         if (IS_ERR(rbdc)) {
4831                 rc = PTR_ERR(rbdc);
4832                 goto err_out_args;
4833         }
4834         ceph_opts = NULL;       /* rbd_dev client now owns this */
4835
4836         /* pick the pool */
4837         osdc = &rbdc->client->osdc;
4838         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4839         if (rc < 0)
4840                 goto err_out_client;
4841         spec->pool_id = (u64)rc;
4842
4843         /* The ceph file layout needs to fit pool id in 32 bits */
4844
4845         if (spec->pool_id > (u64)U32_MAX) {
4846                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4847                                 (unsigned long long)spec->pool_id, U32_MAX);
4848                 rc = -EIO;
4849                 goto err_out_client;
4850         }
4851
4852         rbd_dev = rbd_dev_create(rbdc, spec);
4853         if (!rbd_dev)
4854                 goto err_out_client;
4855         rbdc = NULL;            /* rbd_dev now owns this */
4856         spec = NULL;            /* rbd_dev now owns this */
4857
4858         rc = rbd_dev_image_probe(rbd_dev, read_only);
4859         if (rc < 0)
4860                 goto err_out_rbd_dev;
4861
4862         rc = rbd_dev_device_setup(rbd_dev);
4863         if (!rc)
4864                 return count;
4865
4866         rbd_dev_image_release(rbd_dev);
4867 err_out_rbd_dev:
4868         rbd_dev_destroy(rbd_dev);
4869 err_out_client:
4870         rbd_put_client(rbdc);
4871 err_out_args:
4872         if (ceph_opts)
4873                 ceph_destroy_options(ceph_opts);
4874         kfree(rbd_opts);
4875         rbd_spec_put(spec);
4876 err_out_module:
4877         module_put(THIS_MODULE);
4878
4879         dout("Error adding device %s\n", buf);
4880
4881         return (ssize_t)rc;
4882 }
4883
4884 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4885 {
4886         struct list_head *tmp;
4887         struct rbd_device *rbd_dev;
4888
4889         spin_lock(&rbd_dev_list_lock);
4890         list_for_each(tmp, &rbd_dev_list) {
4891                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4892                 if (rbd_dev->dev_id == dev_id) {
4893                         spin_unlock(&rbd_dev_list_lock);
4894                         return rbd_dev;
4895                 }
4896         }
4897         spin_unlock(&rbd_dev_list_lock);
4898         return NULL;
4899 }
4900
4901 static void rbd_dev_device_release(struct device *dev)
4902 {
4903         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4904
4905         rbd_free_disk(rbd_dev);
4906         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4907         rbd_dev_mapping_clear(rbd_dev);
4908         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4909         rbd_dev->major = 0;
4910         rbd_dev_id_put(rbd_dev);
4911         rbd_dev_mapping_clear(rbd_dev);
4912 }
4913
4914 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4915 {
4916         while (rbd_dev->parent) {
4917                 struct rbd_device *first = rbd_dev;
4918                 struct rbd_device *second = first->parent;
4919                 struct rbd_device *third;
4920
4921                 /*
4922                  * Follow to the parent with no grandparent and
4923                  * remove it.
4924                  */
4925                 while (second && (third = second->parent)) {
4926                         first = second;
4927                         second = third;
4928                 }
4929                 rbd_assert(second);
4930                 rbd_dev_image_release(second);
4931                 first->parent = NULL;
4932                 first->parent_overlap = 0;
4933
4934                 rbd_assert(first->parent_spec);
4935                 rbd_spec_put(first->parent_spec);
4936                 first->parent_spec = NULL;
4937         }
4938 }
4939
4940 static ssize_t rbd_remove(struct bus_type *bus,
4941                           const char *buf,
4942                           size_t count)
4943 {
4944         struct rbd_device *rbd_dev = NULL;
4945         int target_id;
4946         unsigned long ul;
4947         int ret;
4948
4949         ret = strict_strtoul(buf, 10, &ul);
4950         if (ret)
4951                 return ret;
4952
4953         /* convert to int; abort if we lost anything in the conversion */
4954         target_id = (int) ul;
4955         if (target_id != ul)
4956                 return -EINVAL;
4957
4958         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4959
4960         rbd_dev = __rbd_get_dev(target_id);
4961         if (!rbd_dev) {
4962                 ret = -ENOENT;
4963                 goto done;
4964         }
4965
4966         spin_lock_irq(&rbd_dev->lock);
4967         if (rbd_dev->open_count)
4968                 ret = -EBUSY;
4969         else
4970                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4971         spin_unlock_irq(&rbd_dev->lock);
4972         if (ret < 0)
4973                 goto done;
4974         ret = count;
4975         rbd_bus_del_dev(rbd_dev);
4976         rbd_dev_image_release(rbd_dev);
4977         module_put(THIS_MODULE);
4978 done:
4979         mutex_unlock(&ctl_mutex);
4980
4981         return ret;
4982 }
4983
4984 /*
4985  * create control files in sysfs
4986  * /sys/bus/rbd/...
4987  */
4988 static int rbd_sysfs_init(void)
4989 {
4990         int ret;
4991
4992         ret = device_register(&rbd_root_dev);
4993         if (ret < 0)
4994                 return ret;
4995
4996         ret = bus_register(&rbd_bus_type);
4997         if (ret < 0)
4998                 device_unregister(&rbd_root_dev);
4999
5000         return ret;
5001 }
5002
5003 static void rbd_sysfs_cleanup(void)
5004 {
5005         bus_unregister(&rbd_bus_type);
5006         device_unregister(&rbd_root_dev);
5007 }
5008
5009 static int rbd_slab_init(void)
5010 {
5011         rbd_assert(!rbd_img_request_cache);
5012         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5013                                         sizeof (struct rbd_img_request),
5014                                         __alignof__(struct rbd_img_request),
5015                                         0, NULL);
5016         if (!rbd_img_request_cache)
5017                 return -ENOMEM;
5018
5019         rbd_assert(!rbd_obj_request_cache);
5020         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5021                                         sizeof (struct rbd_obj_request),
5022                                         __alignof__(struct rbd_obj_request),
5023                                         0, NULL);
5024         if (!rbd_obj_request_cache)
5025                 goto out_err;
5026
5027         rbd_assert(!rbd_segment_name_cache);
5028         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5029                                         MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5030         if (rbd_segment_name_cache)
5031                 return 0;
5032 out_err:
5033         if (rbd_obj_request_cache) {
5034                 kmem_cache_destroy(rbd_obj_request_cache);
5035                 rbd_obj_request_cache = NULL;
5036         }
5037
5038         kmem_cache_destroy(rbd_img_request_cache);
5039         rbd_img_request_cache = NULL;
5040
5041         return -ENOMEM;
5042 }
5043
5044 static void rbd_slab_exit(void)
5045 {
5046         rbd_assert(rbd_segment_name_cache);
5047         kmem_cache_destroy(rbd_segment_name_cache);
5048         rbd_segment_name_cache = NULL;
5049
5050         rbd_assert(rbd_obj_request_cache);
5051         kmem_cache_destroy(rbd_obj_request_cache);
5052         rbd_obj_request_cache = NULL;
5053
5054         rbd_assert(rbd_img_request_cache);
5055         kmem_cache_destroy(rbd_img_request_cache);
5056         rbd_img_request_cache = NULL;
5057 }
5058
5059 static int __init rbd_init(void)
5060 {
5061         int rc;
5062
5063         if (!libceph_compatible(NULL)) {
5064                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5065
5066                 return -EINVAL;
5067         }
5068         rc = rbd_slab_init();
5069         if (rc)
5070                 return rc;
5071         rc = rbd_sysfs_init();
5072         if (rc)
5073                 rbd_slab_exit();
5074         else
5075                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5076
5077         return rc;
5078 }
5079
5080 static void __exit rbd_exit(void)
5081 {
5082         rbd_sysfs_cleanup();
5083         rbd_slab_exit();
5084 }
5085
5086 module_init(rbd_init);
5087 module_exit(rbd_exit);
5088
5089 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5090 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5091 MODULE_DESCRIPTION("rados block device");
5092
5093 /* following authorship retained from original osdblk.c */
5094 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5095
5096 MODULE_LICENSE("GPL");