rbd: clean up a few dout() calls
[firefly-linux-kernel-4.4.55.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         size_t snap_names_len;
85         u32 total_snaps;
86
87         char *snap_names;
88         u64 *snap_sizes;
89
90         u64 obj_version;
91 };
92
93 struct rbd_options {
94         int     notify_timeout;
95 };
96
97 /*
98  * an instance of the client.  multiple devices may share an rbd client.
99  */
100 struct rbd_client {
101         struct ceph_client      *client;
102         struct rbd_options      *rbd_opts;
103         struct kref             kref;
104         struct list_head        node;
105 };
106
107 /*
108  * a request completion status
109  */
110 struct rbd_req_status {
111         int done;
112         int rc;
113         u64 bytes;
114 };
115
116 /*
117  * a collection of requests
118  */
119 struct rbd_req_coll {
120         int                     total;
121         int                     num_done;
122         struct kref             kref;
123         struct rbd_req_status   status[0];
124 };
125
126 /*
127  * a single io request
128  */
129 struct rbd_request {
130         struct request          *rq;            /* blk layer request */
131         struct bio              *bio;           /* cloned bio */
132         struct page             **pages;        /* list of used pages */
133         u64                     len;
134         int                     coll_index;
135         struct rbd_req_coll     *coll;
136 };
137
138 struct rbd_snap {
139         struct  device          dev;
140         const char              *name;
141         u64                     size;
142         struct list_head        node;
143         u64                     id;
144 };
145
146 /*
147  * a single device
148  */
149 struct rbd_device {
150         int                     id;             /* blkdev unique id */
151
152         int                     major;          /* blkdev assigned major */
153         struct gendisk          *disk;          /* blkdev's gendisk and rq */
154         struct request_queue    *q;
155
156         struct rbd_client       *rbd_client;
157
158         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160         spinlock_t              lock;           /* queue lock */
161
162         struct rbd_image_header header;
163         char                    *image_name;
164         size_t                  image_name_len;
165         char                    *header_name;
166         char                    *pool_name;
167         int                     pool_id;
168
169         struct ceph_osd_event   *watch_event;
170         struct ceph_osd_request *watch_request;
171
172         /* protects updating the header */
173         struct rw_semaphore     header_rwsem;
174         /* name of the snapshot this device reads from */
175         char                    *snap_name;
176         /* id of the snapshot this device reads from */
177         u64                     snap_id;        /* current snapshot id */
178         /* whether the snap_id this device reads from still exists */
179         bool                    snap_exists;
180         int                     read_only;
181
182         struct list_head        node;
183
184         /* list of snapshots */
185         struct list_head        snaps;
186
187         /* sysfs related */
188         struct device           dev;
189 };
190
191 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
192
193 static LIST_HEAD(rbd_dev_list);    /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196 static LIST_HEAD(rbd_client_list);              /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202                             struct device_attribute *attr,
203                             const char *buf,
204                             size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
206                                   struct rbd_snap *snap);
207
208 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
209                        size_t count);
210 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
211                           size_t count);
212
213 static struct bus_attribute rbd_bus_attrs[] = {
214         __ATTR(add, S_IWUSR, NULL, rbd_add),
215         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
216         __ATTR_NULL
217 };
218
219 static struct bus_type rbd_bus_type = {
220         .name           = "rbd",
221         .bus_attrs      = rbd_bus_attrs,
222 };
223
224 static void rbd_root_dev_release(struct device *dev)
225 {
226 }
227
228 static struct device rbd_root_dev = {
229         .init_name =    "rbd",
230         .release =      rbd_root_dev_release,
231 };
232
233
234 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
235 {
236         return get_device(&rbd_dev->dev);
237 }
238
239 static void rbd_put_dev(struct rbd_device *rbd_dev)
240 {
241         put_device(&rbd_dev->dev);
242 }
243
244 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
245
246 static int rbd_open(struct block_device *bdev, fmode_t mode)
247 {
248         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
249
250         rbd_get_dev(rbd_dev);
251
252         set_device_ro(bdev, rbd_dev->read_only);
253
254         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
255                 return -EROFS;
256
257         return 0;
258 }
259
260 static int rbd_release(struct gendisk *disk, fmode_t mode)
261 {
262         struct rbd_device *rbd_dev = disk->private_data;
263
264         rbd_put_dev(rbd_dev);
265
266         return 0;
267 }
268
269 static const struct block_device_operations rbd_bd_ops = {
270         .owner                  = THIS_MODULE,
271         .open                   = rbd_open,
272         .release                = rbd_release,
273 };
274
275 /*
276  * Initialize an rbd client instance.
277  * We own *ceph_opts.
278  */
279 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
280                                             struct rbd_options *rbd_opts)
281 {
282         struct rbd_client *rbdc;
283         int ret = -ENOMEM;
284
285         dout("rbd_client_create\n");
286         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
287         if (!rbdc)
288                 goto out_opt;
289
290         kref_init(&rbdc->kref);
291         INIT_LIST_HEAD(&rbdc->node);
292
293         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
294
295         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
296         if (IS_ERR(rbdc->client))
297                 goto out_mutex;
298         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
299
300         ret = ceph_open_session(rbdc->client);
301         if (ret < 0)
302                 goto out_err;
303
304         rbdc->rbd_opts = rbd_opts;
305
306         spin_lock(&rbd_client_list_lock);
307         list_add_tail(&rbdc->node, &rbd_client_list);
308         spin_unlock(&rbd_client_list_lock);
309
310         mutex_unlock(&ctl_mutex);
311
312         dout("rbd_client_create created %p\n", rbdc);
313         return rbdc;
314
315 out_err:
316         ceph_destroy_client(rbdc->client);
317 out_mutex:
318         mutex_unlock(&ctl_mutex);
319         kfree(rbdc);
320 out_opt:
321         if (ceph_opts)
322                 ceph_destroy_options(ceph_opts);
323         return ERR_PTR(ret);
324 }
325
326 /*
327  * Find a ceph client with specific addr and configuration.
328  */
329 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
330 {
331         struct rbd_client *client_node;
332
333         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
334                 return NULL;
335
336         list_for_each_entry(client_node, &rbd_client_list, node)
337                 if (!ceph_compare_options(ceph_opts, client_node->client))
338                         return client_node;
339         return NULL;
340 }
341
342 /*
343  * mount options
344  */
345 enum {
346         Opt_notify_timeout,
347         Opt_last_int,
348         /* int args above */
349         Opt_last_string,
350         /* string args above */
351 };
352
353 static match_table_t rbd_opts_tokens = {
354         {Opt_notify_timeout, "notify_timeout=%d"},
355         /* int args above */
356         /* string args above */
357         {-1, NULL}
358 };
359
360 static int parse_rbd_opts_token(char *c, void *private)
361 {
362         struct rbd_options *rbd_opts = private;
363         substring_t argstr[MAX_OPT_ARGS];
364         int token, intval, ret;
365
366         token = match_token(c, rbd_opts_tokens, argstr);
367         if (token < 0)
368                 return -EINVAL;
369
370         if (token < Opt_last_int) {
371                 ret = match_int(&argstr[0], &intval);
372                 if (ret < 0) {
373                         pr_err("bad mount option arg (not int) "
374                                "at '%s'\n", c);
375                         return ret;
376                 }
377                 dout("got int token %d val %d\n", token, intval);
378         } else if (token > Opt_last_int && token < Opt_last_string) {
379                 dout("got string token %d val %s\n", token,
380                      argstr[0].from);
381         } else {
382                 dout("got token %d\n", token);
383         }
384
385         switch (token) {
386         case Opt_notify_timeout:
387                 rbd_opts->notify_timeout = intval;
388                 break;
389         default:
390                 BUG_ON(token);
391         }
392         return 0;
393 }
394
395 /*
396  * Get a ceph client with specific addr and configuration, if one does
397  * not exist create it.
398  */
399 static struct rbd_client *rbd_get_client(const char *mon_addr,
400                                          size_t mon_addr_len,
401                                          char *options)
402 {
403         struct rbd_client *rbdc;
404         struct ceph_options *ceph_opts;
405         struct rbd_options *rbd_opts;
406
407         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
408         if (!rbd_opts)
409                 return ERR_PTR(-ENOMEM);
410
411         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
412
413         ceph_opts = ceph_parse_options(options, mon_addr,
414                                         mon_addr + mon_addr_len,
415                                         parse_rbd_opts_token, rbd_opts);
416         if (IS_ERR(ceph_opts)) {
417                 kfree(rbd_opts);
418                 return ERR_CAST(ceph_opts);
419         }
420
421         spin_lock(&rbd_client_list_lock);
422         rbdc = __rbd_client_find(ceph_opts);
423         if (rbdc) {
424                 /* using an existing client */
425                 kref_get(&rbdc->kref);
426                 spin_unlock(&rbd_client_list_lock);
427
428                 ceph_destroy_options(ceph_opts);
429                 kfree(rbd_opts);
430
431                 return rbdc;
432         }
433         spin_unlock(&rbd_client_list_lock);
434
435         rbdc = rbd_client_create(ceph_opts, rbd_opts);
436
437         if (IS_ERR(rbdc))
438                 kfree(rbd_opts);
439
440         return rbdc;
441 }
442
443 /*
444  * Destroy ceph client
445  *
446  * Caller must hold rbd_client_list_lock.
447  */
448 static void rbd_client_release(struct kref *kref)
449 {
450         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
451
452         dout("rbd_release_client %p\n", rbdc);
453         spin_lock(&rbd_client_list_lock);
454         list_del(&rbdc->node);
455         spin_unlock(&rbd_client_list_lock);
456
457         ceph_destroy_client(rbdc->client);
458         kfree(rbdc->rbd_opts);
459         kfree(rbdc);
460 }
461
462 /*
463  * Drop reference to ceph client node. If it's not referenced anymore, release
464  * it.
465  */
466 static void rbd_put_client(struct rbd_device *rbd_dev)
467 {
468         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
469         rbd_dev->rbd_client = NULL;
470 }
471
472 /*
473  * Destroy requests collection
474  */
475 static void rbd_coll_release(struct kref *kref)
476 {
477         struct rbd_req_coll *coll =
478                 container_of(kref, struct rbd_req_coll, kref);
479
480         dout("rbd_coll_release %p\n", coll);
481         kfree(coll);
482 }
483
484 /*
485  * Create a new header structure, translate header format from the on-disk
486  * header.
487  */
488 static int rbd_header_from_disk(struct rbd_image_header *header,
489                                  struct rbd_image_header_ondisk *ondisk,
490                                  u32 allocated_snaps,
491                                  gfp_t gfp_flags)
492 {
493         u32 i, snap_count;
494
495         if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
496                 return -ENXIO;
497
498         snap_count = le32_to_cpu(ondisk->snap_count);
499         if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
500                          / sizeof (*ondisk))
501                 return -EINVAL;
502         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
503                                 snap_count * sizeof(u64),
504                                 gfp_flags);
505         if (!header->snapc)
506                 return -ENOMEM;
507
508         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
509         if (snap_count) {
510                 header->snap_names = kmalloc(header->snap_names_len,
511                                              gfp_flags);
512                 if (!header->snap_names)
513                         goto err_snapc;
514                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
515                                              gfp_flags);
516                 if (!header->snap_sizes)
517                         goto err_names;
518         } else {
519                 header->snap_names = NULL;
520                 header->snap_sizes = NULL;
521         }
522
523         header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
524                                         gfp_flags);
525         if (!header->object_prefix)
526                 goto err_sizes;
527
528         memcpy(header->object_prefix, ondisk->block_name,
529                sizeof(ondisk->block_name));
530         header->object_prefix[sizeof (ondisk->block_name)] = '\0';
531
532         header->image_size = le64_to_cpu(ondisk->image_size);
533         header->obj_order = ondisk->options.order;
534         header->crypt_type = ondisk->options.crypt_type;
535         header->comp_type = ondisk->options.comp_type;
536
537         atomic_set(&header->snapc->nref, 1);
538         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
539         header->snapc->num_snaps = snap_count;
540         header->total_snaps = snap_count;
541
542         if (snap_count && allocated_snaps == snap_count) {
543                 for (i = 0; i < snap_count; i++) {
544                         header->snapc->snaps[i] =
545                                 le64_to_cpu(ondisk->snaps[i].id);
546                         header->snap_sizes[i] =
547                                 le64_to_cpu(ondisk->snaps[i].image_size);
548                 }
549
550                 /* copy snapshot names */
551                 memcpy(header->snap_names, &ondisk->snaps[i],
552                         header->snap_names_len);
553         }
554
555         return 0;
556
557 err_sizes:
558         kfree(header->snap_sizes);
559 err_names:
560         kfree(header->snap_names);
561 err_snapc:
562         kfree(header->snapc);
563         return -ENOMEM;
564 }
565
566 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
567                         u64 *seq, u64 *size)
568 {
569         int i;
570         char *p = header->snap_names;
571
572         for (i = 0; i < header->total_snaps; i++) {
573                 if (!strcmp(snap_name, p)) {
574
575                         /* Found it.  Pass back its id and/or size */
576
577                         if (seq)
578                                 *seq = header->snapc->snaps[i];
579                         if (size)
580                                 *size = header->snap_sizes[i];
581                         return i;
582                 }
583                 p += strlen(p) + 1;     /* Skip ahead to the next name */
584         }
585         return -ENOENT;
586 }
587
588 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
589 {
590         int ret;
591
592         down_write(&rbd_dev->header_rwsem);
593
594         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
595                     sizeof (RBD_SNAP_HEAD_NAME))) {
596                 rbd_dev->snap_id = CEPH_NOSNAP;
597                 rbd_dev->snap_exists = false;
598                 rbd_dev->read_only = 0;
599                 if (size)
600                         *size = rbd_dev->header.image_size;
601         } else {
602                 u64 snap_id = 0;
603
604                 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
605                                         &snap_id, size);
606                 if (ret < 0)
607                         goto done;
608                 rbd_dev->snap_id = snap_id;
609                 rbd_dev->snap_exists = true;
610                 rbd_dev->read_only = 1;
611         }
612
613         ret = 0;
614 done:
615         up_write(&rbd_dev->header_rwsem);
616         return ret;
617 }
618
619 static void rbd_header_free(struct rbd_image_header *header)
620 {
621         kfree(header->object_prefix);
622         kfree(header->snap_sizes);
623         kfree(header->snap_names);
624         ceph_put_snap_context(header->snapc);
625 }
626
627 /*
628  * get the actual striped segment name, offset and length
629  */
630 static u64 rbd_get_segment(struct rbd_image_header *header,
631                            const char *object_prefix,
632                            u64 ofs, u64 len,
633                            char *seg_name, u64 *segofs)
634 {
635         u64 seg = ofs >> header->obj_order;
636
637         if (seg_name)
638                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
639                          "%s.%012llx", object_prefix, seg);
640
641         ofs = ofs & ((1 << header->obj_order) - 1);
642         len = min_t(u64, len, (1 << header->obj_order) - ofs);
643
644         if (segofs)
645                 *segofs = ofs;
646
647         return len;
648 }
649
650 static int rbd_get_num_segments(struct rbd_image_header *header,
651                                 u64 ofs, u64 len)
652 {
653         u64 start_seg = ofs >> header->obj_order;
654         u64 end_seg = (ofs + len - 1) >> header->obj_order;
655         return end_seg - start_seg + 1;
656 }
657
658 /*
659  * returns the size of an object in the image
660  */
661 static u64 rbd_obj_bytes(struct rbd_image_header *header)
662 {
663         return 1 << header->obj_order;
664 }
665
666 /*
667  * bio helpers
668  */
669
670 static void bio_chain_put(struct bio *chain)
671 {
672         struct bio *tmp;
673
674         while (chain) {
675                 tmp = chain;
676                 chain = chain->bi_next;
677                 bio_put(tmp);
678         }
679 }
680
681 /*
682  * zeros a bio chain, starting at specific offset
683  */
684 static void zero_bio_chain(struct bio *chain, int start_ofs)
685 {
686         struct bio_vec *bv;
687         unsigned long flags;
688         void *buf;
689         int i;
690         int pos = 0;
691
692         while (chain) {
693                 bio_for_each_segment(bv, chain, i) {
694                         if (pos + bv->bv_len > start_ofs) {
695                                 int remainder = max(start_ofs - pos, 0);
696                                 buf = bvec_kmap_irq(bv, &flags);
697                                 memset(buf + remainder, 0,
698                                        bv->bv_len - remainder);
699                                 bvec_kunmap_irq(buf, &flags);
700                         }
701                         pos += bv->bv_len;
702                 }
703
704                 chain = chain->bi_next;
705         }
706 }
707
708 /*
709  * bio_chain_clone - clone a chain of bios up to a certain length.
710  * might return a bio_pair that will need to be released.
711  */
712 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
713                                    struct bio_pair **bp,
714                                    int len, gfp_t gfpmask)
715 {
716         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
717         int total = 0;
718
719         if (*bp) {
720                 bio_pair_release(*bp);
721                 *bp = NULL;
722         }
723
724         while (old_chain && (total < len)) {
725                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
726                 if (!tmp)
727                         goto err_out;
728
729                 if (total + old_chain->bi_size > len) {
730                         struct bio_pair *bp;
731
732                         /*
733                          * this split can only happen with a single paged bio,
734                          * split_bio will BUG_ON if this is not the case
735                          */
736                         dout("bio_chain_clone split! total=%d remaining=%d"
737                              "bi_size=%u\n",
738                              total, len - total, old_chain->bi_size);
739
740                         /* split the bio. We'll release it either in the next
741                            call, or it will have to be released outside */
742                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
743                         if (!bp)
744                                 goto err_out;
745
746                         __bio_clone(tmp, &bp->bio1);
747
748                         *next = &bp->bio2;
749                 } else {
750                         __bio_clone(tmp, old_chain);
751                         *next = old_chain->bi_next;
752                 }
753
754                 tmp->bi_bdev = NULL;
755                 gfpmask &= ~__GFP_WAIT;
756                 tmp->bi_next = NULL;
757
758                 if (!new_chain) {
759                         new_chain = tail = tmp;
760                 } else {
761                         tail->bi_next = tmp;
762                         tail = tmp;
763                 }
764                 old_chain = old_chain->bi_next;
765
766                 total += tmp->bi_size;
767         }
768
769         BUG_ON(total < len);
770
771         if (tail)
772                 tail->bi_next = NULL;
773
774         *old = old_chain;
775
776         return new_chain;
777
778 err_out:
779         dout("bio_chain_clone with err\n");
780         bio_chain_put(new_chain);
781         return NULL;
782 }
783
784 /*
785  * helpers for osd request op vectors.
786  */
787 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
788                             int num_ops,
789                             int opcode,
790                             u32 payload_len)
791 {
792         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
793                        GFP_NOIO);
794         if (!*ops)
795                 return -ENOMEM;
796         (*ops)[0].op = opcode;
797         /*
798          * op extent offset and length will be set later on
799          * in calc_raw_layout()
800          */
801         (*ops)[0].payload_len = payload_len;
802         return 0;
803 }
804
805 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
806 {
807         kfree(ops);
808 }
809
810 static void rbd_coll_end_req_index(struct request *rq,
811                                    struct rbd_req_coll *coll,
812                                    int index,
813                                    int ret, u64 len)
814 {
815         struct request_queue *q;
816         int min, max, i;
817
818         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
819              coll, index, ret, (unsigned long long) len);
820
821         if (!rq)
822                 return;
823
824         if (!coll) {
825                 blk_end_request(rq, ret, len);
826                 return;
827         }
828
829         q = rq->q;
830
831         spin_lock_irq(q->queue_lock);
832         coll->status[index].done = 1;
833         coll->status[index].rc = ret;
834         coll->status[index].bytes = len;
835         max = min = coll->num_done;
836         while (max < coll->total && coll->status[max].done)
837                 max++;
838
839         for (i = min; i<max; i++) {
840                 __blk_end_request(rq, coll->status[i].rc,
841                                   coll->status[i].bytes);
842                 coll->num_done++;
843                 kref_put(&coll->kref, rbd_coll_release);
844         }
845         spin_unlock_irq(q->queue_lock);
846 }
847
848 static void rbd_coll_end_req(struct rbd_request *req,
849                              int ret, u64 len)
850 {
851         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
852 }
853
854 /*
855  * Send ceph osd request
856  */
857 static int rbd_do_request(struct request *rq,
858                           struct rbd_device *rbd_dev,
859                           struct ceph_snap_context *snapc,
860                           u64 snapid,
861                           const char *object_name, u64 ofs, u64 len,
862                           struct bio *bio,
863                           struct page **pages,
864                           int num_pages,
865                           int flags,
866                           struct ceph_osd_req_op *ops,
867                           struct rbd_req_coll *coll,
868                           int coll_index,
869                           void (*rbd_cb)(struct ceph_osd_request *req,
870                                          struct ceph_msg *msg),
871                           struct ceph_osd_request **linger_req,
872                           u64 *ver)
873 {
874         struct ceph_osd_request *req;
875         struct ceph_file_layout *layout;
876         int ret;
877         u64 bno;
878         struct timespec mtime = CURRENT_TIME;
879         struct rbd_request *req_data;
880         struct ceph_osd_request_head *reqhead;
881         struct ceph_osd_client *osdc;
882
883         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
884         if (!req_data) {
885                 if (coll)
886                         rbd_coll_end_req_index(rq, coll, coll_index,
887                                                -ENOMEM, len);
888                 return -ENOMEM;
889         }
890
891         if (coll) {
892                 req_data->coll = coll;
893                 req_data->coll_index = coll_index;
894         }
895
896         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
897                 (unsigned long long) ofs, (unsigned long long) len);
898
899         osdc = &rbd_dev->rbd_client->client->osdc;
900         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
901                                         false, GFP_NOIO, pages, bio);
902         if (!req) {
903                 ret = -ENOMEM;
904                 goto done_pages;
905         }
906
907         req->r_callback = rbd_cb;
908
909         req_data->rq = rq;
910         req_data->bio = bio;
911         req_data->pages = pages;
912         req_data->len = len;
913
914         req->r_priv = req_data;
915
916         reqhead = req->r_request->front.iov_base;
917         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
918
919         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
920         req->r_oid_len = strlen(req->r_oid);
921
922         layout = &req->r_file_layout;
923         memset(layout, 0, sizeof(*layout));
924         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
925         layout->fl_stripe_count = cpu_to_le32(1);
926         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
927         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
928         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
929                                 req, ops);
930
931         ceph_osdc_build_request(req, ofs, &len,
932                                 ops,
933                                 snapc,
934                                 &mtime,
935                                 req->r_oid, req->r_oid_len);
936
937         if (linger_req) {
938                 ceph_osdc_set_request_linger(osdc, req);
939                 *linger_req = req;
940         }
941
942         ret = ceph_osdc_start_request(osdc, req, false);
943         if (ret < 0)
944                 goto done_err;
945
946         if (!rbd_cb) {
947                 ret = ceph_osdc_wait_request(osdc, req);
948                 if (ver)
949                         *ver = le64_to_cpu(req->r_reassert_version.version);
950                 dout("reassert_ver=%llu\n",
951                         (unsigned long long)
952                                 le64_to_cpu(req->r_reassert_version.version));
953                 ceph_osdc_put_request(req);
954         }
955         return ret;
956
957 done_err:
958         bio_chain_put(req_data->bio);
959         ceph_osdc_put_request(req);
960 done_pages:
961         rbd_coll_end_req(req_data, ret, len);
962         kfree(req_data);
963         return ret;
964 }
965
966 /*
967  * Ceph osd op callback
968  */
969 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
970 {
971         struct rbd_request *req_data = req->r_priv;
972         struct ceph_osd_reply_head *replyhead;
973         struct ceph_osd_op *op;
974         __s32 rc;
975         u64 bytes;
976         int read_op;
977
978         /* parse reply */
979         replyhead = msg->front.iov_base;
980         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
981         op = (void *)(replyhead + 1);
982         rc = le32_to_cpu(replyhead->result);
983         bytes = le64_to_cpu(op->extent.length);
984         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
985
986         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
987                 (unsigned long long) bytes, read_op, (int) rc);
988
989         if (rc == -ENOENT && read_op) {
990                 zero_bio_chain(req_data->bio, 0);
991                 rc = 0;
992         } else if (rc == 0 && read_op && bytes < req_data->len) {
993                 zero_bio_chain(req_data->bio, bytes);
994                 bytes = req_data->len;
995         }
996
997         rbd_coll_end_req(req_data, rc, bytes);
998
999         if (req_data->bio)
1000                 bio_chain_put(req_data->bio);
1001
1002         ceph_osdc_put_request(req);
1003         kfree(req_data);
1004 }
1005
1006 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1007 {
1008         ceph_osdc_put_request(req);
1009 }
1010
1011 /*
1012  * Do a synchronous ceph osd operation
1013  */
1014 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1015                            struct ceph_snap_context *snapc,
1016                            u64 snapid,
1017                            int opcode,
1018                            int flags,
1019                            struct ceph_osd_req_op *orig_ops,
1020                            const char *object_name,
1021                            u64 ofs, u64 len,
1022                            char *buf,
1023                            struct ceph_osd_request **linger_req,
1024                            u64 *ver)
1025 {
1026         int ret;
1027         struct page **pages;
1028         int num_pages;
1029         struct ceph_osd_req_op *ops = orig_ops;
1030         u32 payload_len;
1031
1032         num_pages = calc_pages_for(ofs , len);
1033         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1034         if (IS_ERR(pages))
1035                 return PTR_ERR(pages);
1036
1037         if (!orig_ops) {
1038                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1039                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1040                 if (ret < 0)
1041                         goto done;
1042
1043                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1044                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1045                         if (ret < 0)
1046                                 goto done_ops;
1047                 }
1048         }
1049
1050         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1051                           object_name, ofs, len, NULL,
1052                           pages, num_pages,
1053                           flags,
1054                           ops,
1055                           NULL, 0,
1056                           NULL,
1057                           linger_req, ver);
1058         if (ret < 0)
1059                 goto done_ops;
1060
1061         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1062                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1063
1064 done_ops:
1065         if (!orig_ops)
1066                 rbd_destroy_ops(ops);
1067 done:
1068         ceph_release_page_vector(pages, num_pages);
1069         return ret;
1070 }
1071
1072 /*
1073  * Do an asynchronous ceph osd operation
1074  */
1075 static int rbd_do_op(struct request *rq,
1076                      struct rbd_device *rbd_dev,
1077                      struct ceph_snap_context *snapc,
1078                      u64 snapid,
1079                      int opcode, int flags,
1080                      u64 ofs, u64 len,
1081                      struct bio *bio,
1082                      struct rbd_req_coll *coll,
1083                      int coll_index)
1084 {
1085         char *seg_name;
1086         u64 seg_ofs;
1087         u64 seg_len;
1088         int ret;
1089         struct ceph_osd_req_op *ops;
1090         u32 payload_len;
1091
1092         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1093         if (!seg_name)
1094                 return -ENOMEM;
1095
1096         seg_len = rbd_get_segment(&rbd_dev->header,
1097                                   rbd_dev->header.object_prefix,
1098                                   ofs, len,
1099                                   seg_name, &seg_ofs);
1100
1101         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1102
1103         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1104         if (ret < 0)
1105                 goto done;
1106
1107         /* we've taken care of segment sizes earlier when we
1108            cloned the bios. We should never have a segment
1109            truncated at this point */
1110         BUG_ON(seg_len < len);
1111
1112         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1113                              seg_name, seg_ofs, seg_len,
1114                              bio,
1115                              NULL, 0,
1116                              flags,
1117                              ops,
1118                              coll, coll_index,
1119                              rbd_req_cb, 0, NULL);
1120
1121         rbd_destroy_ops(ops);
1122 done:
1123         kfree(seg_name);
1124         return ret;
1125 }
1126
1127 /*
1128  * Request async osd write
1129  */
1130 static int rbd_req_write(struct request *rq,
1131                          struct rbd_device *rbd_dev,
1132                          struct ceph_snap_context *snapc,
1133                          u64 ofs, u64 len,
1134                          struct bio *bio,
1135                          struct rbd_req_coll *coll,
1136                          int coll_index)
1137 {
1138         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1139                          CEPH_OSD_OP_WRITE,
1140                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1141                          ofs, len, bio, coll, coll_index);
1142 }
1143
1144 /*
1145  * Request async osd read
1146  */
1147 static int rbd_req_read(struct request *rq,
1148                          struct rbd_device *rbd_dev,
1149                          u64 snapid,
1150                          u64 ofs, u64 len,
1151                          struct bio *bio,
1152                          struct rbd_req_coll *coll,
1153                          int coll_index)
1154 {
1155         return rbd_do_op(rq, rbd_dev, NULL,
1156                          snapid,
1157                          CEPH_OSD_OP_READ,
1158                          CEPH_OSD_FLAG_READ,
1159                          ofs, len, bio, coll, coll_index);
1160 }
1161
1162 /*
1163  * Request sync osd read
1164  */
1165 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1166                           struct ceph_snap_context *snapc,
1167                           u64 snapid,
1168                           const char *object_name,
1169                           u64 ofs, u64 len,
1170                           char *buf,
1171                           u64 *ver)
1172 {
1173         return rbd_req_sync_op(rbd_dev, NULL,
1174                                snapid,
1175                                CEPH_OSD_OP_READ,
1176                                CEPH_OSD_FLAG_READ,
1177                                NULL,
1178                                object_name, ofs, len, buf, NULL, ver);
1179 }
1180
1181 /*
1182  * Request sync osd watch
1183  */
1184 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1185                                    u64 ver,
1186                                    u64 notify_id,
1187                                    const char *object_name)
1188 {
1189         struct ceph_osd_req_op *ops;
1190         int ret;
1191
1192         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1193         if (ret < 0)
1194                 return ret;
1195
1196         ops[0].watch.ver = cpu_to_le64(ver);
1197         ops[0].watch.cookie = notify_id;
1198         ops[0].watch.flag = 0;
1199
1200         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1201                           object_name, 0, 0, NULL,
1202                           NULL, 0,
1203                           CEPH_OSD_FLAG_READ,
1204                           ops,
1205                           NULL, 0,
1206                           rbd_simple_req_cb, 0, NULL);
1207
1208         rbd_destroy_ops(ops);
1209         return ret;
1210 }
1211
1212 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1213 {
1214         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1215         u64 hver;
1216         int rc;
1217
1218         if (!rbd_dev)
1219                 return;
1220
1221         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1222                 rbd_dev->header_name, (unsigned long long) notify_id,
1223                 (unsigned int) opcode);
1224         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1225         rc = __rbd_refresh_header(rbd_dev);
1226         hver = rbd_dev->header.obj_version;
1227         mutex_unlock(&ctl_mutex);
1228         if (rc)
1229                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1230                            " update snaps: %d\n", rbd_dev->major, rc);
1231
1232         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id, rbd_dev->header_name);
1233 }
1234
1235 /*
1236  * Request sync osd watch
1237  */
1238 static int rbd_req_sync_watch(struct rbd_device *rbd_dev,
1239                               const char *object_name,
1240                               u64 ver)
1241 {
1242         struct ceph_osd_req_op *ops;
1243         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1244
1245         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1246         if (ret < 0)
1247                 return ret;
1248
1249         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1250                                      (void *)rbd_dev, &rbd_dev->watch_event);
1251         if (ret < 0)
1252                 goto fail;
1253
1254         ops[0].watch.ver = cpu_to_le64(ver);
1255         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1256         ops[0].watch.flag = 1;
1257
1258         ret = rbd_req_sync_op(rbd_dev, NULL,
1259                               CEPH_NOSNAP,
1260                               0,
1261                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1262                               ops,
1263                               object_name, 0, 0, NULL,
1264                               &rbd_dev->watch_request, NULL);
1265
1266         if (ret < 0)
1267                 goto fail_event;
1268
1269         rbd_destroy_ops(ops);
1270         return 0;
1271
1272 fail_event:
1273         ceph_osdc_cancel_event(rbd_dev->watch_event);
1274         rbd_dev->watch_event = NULL;
1275 fail:
1276         rbd_destroy_ops(ops);
1277         return ret;
1278 }
1279
1280 /*
1281  * Request sync osd unwatch
1282  */
1283 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev,
1284                                 const char *object_name)
1285 {
1286         struct ceph_osd_req_op *ops;
1287
1288         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1289         if (ret < 0)
1290                 return ret;
1291
1292         ops[0].watch.ver = 0;
1293         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1294         ops[0].watch.flag = 0;
1295
1296         ret = rbd_req_sync_op(rbd_dev, NULL,
1297                               CEPH_NOSNAP,
1298                               0,
1299                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1300                               ops,
1301                               object_name, 0, 0, NULL, NULL, NULL);
1302
1303         rbd_destroy_ops(ops);
1304         ceph_osdc_cancel_event(rbd_dev->watch_event);
1305         rbd_dev->watch_event = NULL;
1306         return ret;
1307 }
1308
1309 struct rbd_notify_info {
1310         struct rbd_device *rbd_dev;
1311 };
1312
1313 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1314 {
1315         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1316         if (!rbd_dev)
1317                 return;
1318
1319         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1320                         rbd_dev->header_name, (unsigned long long) notify_id,
1321                         (unsigned int) opcode);
1322 }
1323
1324 /*
1325  * Request sync osd notify
1326  */
1327 static int rbd_req_sync_notify(struct rbd_device *rbd_dev,
1328                           const char *object_name)
1329 {
1330         struct ceph_osd_req_op *ops;
1331         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1332         struct ceph_osd_event *event;
1333         struct rbd_notify_info info;
1334         int payload_len = sizeof(u32) + sizeof(u32);
1335         int ret;
1336
1337         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1338         if (ret < 0)
1339                 return ret;
1340
1341         info.rbd_dev = rbd_dev;
1342
1343         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1344                                      (void *)&info, &event);
1345         if (ret < 0)
1346                 goto fail;
1347
1348         ops[0].watch.ver = 1;
1349         ops[0].watch.flag = 1;
1350         ops[0].watch.cookie = event->cookie;
1351         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1352         ops[0].watch.timeout = 12;
1353
1354         ret = rbd_req_sync_op(rbd_dev, NULL,
1355                                CEPH_NOSNAP,
1356                                0,
1357                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1358                                ops,
1359                                object_name, 0, 0, NULL, NULL, NULL);
1360         if (ret < 0)
1361                 goto fail_event;
1362
1363         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1364         dout("ceph_osdc_wait_event returned %d\n", ret);
1365         rbd_destroy_ops(ops);
1366         return 0;
1367
1368 fail_event:
1369         ceph_osdc_cancel_event(event);
1370 fail:
1371         rbd_destroy_ops(ops);
1372         return ret;
1373 }
1374
1375 /*
1376  * Request sync osd read
1377  */
1378 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1379                              const char *object_name,
1380                              const char *class_name,
1381                              const char *method_name,
1382                              const char *data,
1383                              int len,
1384                              u64 *ver)
1385 {
1386         struct ceph_osd_req_op *ops;
1387         int class_name_len = strlen(class_name);
1388         int method_name_len = strlen(method_name);
1389         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1390                                     class_name_len + method_name_len + len);
1391         if (ret < 0)
1392                 return ret;
1393
1394         ops[0].cls.class_name = class_name;
1395         ops[0].cls.class_len = (__u8) class_name_len;
1396         ops[0].cls.method_name = method_name;
1397         ops[0].cls.method_len = (__u8) method_name_len;
1398         ops[0].cls.argc = 0;
1399         ops[0].cls.indata = data;
1400         ops[0].cls.indata_len = len;
1401
1402         ret = rbd_req_sync_op(rbd_dev, NULL,
1403                                CEPH_NOSNAP,
1404                                0,
1405                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1406                                ops,
1407                                object_name, 0, 0, NULL, NULL, ver);
1408
1409         rbd_destroy_ops(ops);
1410
1411         dout("cls_exec returned %d\n", ret);
1412         return ret;
1413 }
1414
1415 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1416 {
1417         struct rbd_req_coll *coll =
1418                         kzalloc(sizeof(struct rbd_req_coll) +
1419                                 sizeof(struct rbd_req_status) * num_reqs,
1420                                 GFP_ATOMIC);
1421
1422         if (!coll)
1423                 return NULL;
1424         coll->total = num_reqs;
1425         kref_init(&coll->kref);
1426         return coll;
1427 }
1428
1429 /*
1430  * block device queue callback
1431  */
1432 static void rbd_rq_fn(struct request_queue *q)
1433 {
1434         struct rbd_device *rbd_dev = q->queuedata;
1435         struct request *rq;
1436         struct bio_pair *bp = NULL;
1437
1438         while ((rq = blk_fetch_request(q))) {
1439                 struct bio *bio;
1440                 struct bio *rq_bio, *next_bio = NULL;
1441                 bool do_write;
1442                 unsigned int size;
1443                 u64 op_size = 0;
1444                 u64 ofs;
1445                 int num_segs, cur_seg = 0;
1446                 struct rbd_req_coll *coll;
1447                 struct ceph_snap_context *snapc;
1448
1449                 /* peek at request from block layer */
1450                 if (!rq)
1451                         break;
1452
1453                 dout("fetched request\n");
1454
1455                 /* filter out block requests we don't understand */
1456                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1457                         __blk_end_request_all(rq, 0);
1458                         continue;
1459                 }
1460
1461                 /* deduce our operation (read, write) */
1462                 do_write = (rq_data_dir(rq) == WRITE);
1463
1464                 size = blk_rq_bytes(rq);
1465                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1466                 rq_bio = rq->bio;
1467                 if (do_write && rbd_dev->read_only) {
1468                         __blk_end_request_all(rq, -EROFS);
1469                         continue;
1470                 }
1471
1472                 spin_unlock_irq(q->queue_lock);
1473
1474                 down_read(&rbd_dev->header_rwsem);
1475
1476                 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1477                         up_read(&rbd_dev->header_rwsem);
1478                         dout("request for non-existent snapshot");
1479                         spin_lock_irq(q->queue_lock);
1480                         __blk_end_request_all(rq, -ENXIO);
1481                         continue;
1482                 }
1483
1484                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1485
1486                 up_read(&rbd_dev->header_rwsem);
1487
1488                 dout("%s 0x%x bytes at 0x%llx\n",
1489                      do_write ? "write" : "read",
1490                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1491
1492                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1493                 coll = rbd_alloc_coll(num_segs);
1494                 if (!coll) {
1495                         spin_lock_irq(q->queue_lock);
1496                         __blk_end_request_all(rq, -ENOMEM);
1497                         ceph_put_snap_context(snapc);
1498                         continue;
1499                 }
1500
1501                 do {
1502                         /* a bio clone to be passed down to OSD req */
1503                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1504                         op_size = rbd_get_segment(&rbd_dev->header,
1505                                                   rbd_dev->header.object_prefix,
1506                                                   ofs, size,
1507                                                   NULL, NULL);
1508                         kref_get(&coll->kref);
1509                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1510                                               op_size, GFP_ATOMIC);
1511                         if (!bio) {
1512                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1513                                                        -ENOMEM, op_size);
1514                                 goto next_seg;
1515                         }
1516
1517
1518                         /* init OSD command: write or read */
1519                         if (do_write)
1520                                 rbd_req_write(rq, rbd_dev,
1521                                               snapc,
1522                                               ofs,
1523                                               op_size, bio,
1524                                               coll, cur_seg);
1525                         else
1526                                 rbd_req_read(rq, rbd_dev,
1527                                              rbd_dev->snap_id,
1528                                              ofs,
1529                                              op_size, bio,
1530                                              coll, cur_seg);
1531
1532 next_seg:
1533                         size -= op_size;
1534                         ofs += op_size;
1535
1536                         cur_seg++;
1537                         rq_bio = next_bio;
1538                 } while (size > 0);
1539                 kref_put(&coll->kref, rbd_coll_release);
1540
1541                 if (bp)
1542                         bio_pair_release(bp);
1543                 spin_lock_irq(q->queue_lock);
1544
1545                 ceph_put_snap_context(snapc);
1546         }
1547 }
1548
1549 /*
1550  * a queue callback. Makes sure that we don't create a bio that spans across
1551  * multiple osd objects. One exception would be with a single page bios,
1552  * which we handle later at bio_chain_clone
1553  */
1554 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1555                           struct bio_vec *bvec)
1556 {
1557         struct rbd_device *rbd_dev = q->queuedata;
1558         unsigned int chunk_sectors;
1559         sector_t sector;
1560         unsigned int bio_sectors;
1561         int max;
1562
1563         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1564         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1565         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1566
1567         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1568                                  + bio_sectors)) << SECTOR_SHIFT;
1569         if (max < 0)
1570                 max = 0; /* bio_add cannot handle a negative return */
1571         if (max <= bvec->bv_len && bio_sectors == 0)
1572                 return bvec->bv_len;
1573         return max;
1574 }
1575
1576 static void rbd_free_disk(struct rbd_device *rbd_dev)
1577 {
1578         struct gendisk *disk = rbd_dev->disk;
1579
1580         if (!disk)
1581                 return;
1582
1583         rbd_header_free(&rbd_dev->header);
1584
1585         if (disk->flags & GENHD_FL_UP)
1586                 del_gendisk(disk);
1587         if (disk->queue)
1588                 blk_cleanup_queue(disk->queue);
1589         put_disk(disk);
1590 }
1591
1592 /*
1593  * reload the ondisk the header 
1594  */
1595 static int rbd_read_header(struct rbd_device *rbd_dev,
1596                            struct rbd_image_header *header)
1597 {
1598         ssize_t rc;
1599         struct rbd_image_header_ondisk *dh;
1600         u32 snap_count = 0;
1601         u64 ver;
1602         size_t len;
1603
1604         /*
1605          * First reads the fixed-size header to determine the number
1606          * of snapshots, then re-reads it, along with all snapshot
1607          * records as well as their stored names.
1608          */
1609         len = sizeof (*dh);
1610         while (1) {
1611                 dh = kmalloc(len, GFP_KERNEL);
1612                 if (!dh)
1613                         return -ENOMEM;
1614
1615                 rc = rbd_req_sync_read(rbd_dev,
1616                                        NULL, CEPH_NOSNAP,
1617                                        rbd_dev->header_name,
1618                                        0, len,
1619                                        (char *)dh, &ver);
1620                 if (rc < 0)
1621                         goto out_dh;
1622
1623                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1624                 if (rc < 0) {
1625                         if (rc == -ENXIO)
1626                                 pr_warning("unrecognized header format"
1627                                            " for image %s\n",
1628                                            rbd_dev->image_name);
1629                         goto out_dh;
1630                 }
1631
1632                 if (snap_count == header->total_snaps)
1633                         break;
1634
1635                 snap_count = header->total_snaps;
1636                 len = sizeof (*dh) +
1637                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1638                         header->snap_names_len;
1639
1640                 rbd_header_free(header);
1641                 kfree(dh);
1642         }
1643         header->obj_version = ver;
1644
1645 out_dh:
1646         kfree(dh);
1647         return rc;
1648 }
1649
1650 /*
1651  * create a snapshot
1652  */
1653 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1654                                const char *snap_name,
1655                                gfp_t gfp_flags)
1656 {
1657         int name_len = strlen(snap_name);
1658         u64 new_snapid;
1659         int ret;
1660         void *data, *p, *e;
1661         u64 ver;
1662         struct ceph_mon_client *monc;
1663
1664         /* we should create a snapshot only if we're pointing at the head */
1665         if (rbd_dev->snap_id != CEPH_NOSNAP)
1666                 return -EINVAL;
1667
1668         monc = &rbd_dev->rbd_client->client->monc;
1669         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1670         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1671         if (ret < 0)
1672                 return ret;
1673
1674         data = kmalloc(name_len + 16, gfp_flags);
1675         if (!data)
1676                 return -ENOMEM;
1677
1678         p = data;
1679         e = data + name_len + 16;
1680
1681         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1682         ceph_encode_64_safe(&p, e, new_snapid, bad);
1683
1684         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1685                                 "rbd", "snap_add",
1686                                 data, p - data, &ver);
1687
1688         kfree(data);
1689
1690         return ret < 0 ? ret : 0;
1691 bad:
1692         return -ERANGE;
1693 }
1694
1695 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1696 {
1697         struct rbd_snap *snap;
1698         struct rbd_snap *next;
1699
1700         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1701                 __rbd_remove_snap_dev(rbd_dev, snap);
1702 }
1703
1704 /*
1705  * only read the first part of the ondisk header, without the snaps info
1706  */
1707 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1708 {
1709         int ret;
1710         struct rbd_image_header h;
1711
1712         ret = rbd_read_header(rbd_dev, &h);
1713         if (ret < 0)
1714                 return ret;
1715
1716         down_write(&rbd_dev->header_rwsem);
1717
1718         /* resized? */
1719         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1720                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1721
1722                 dout("setting size to %llu sectors", (unsigned long long) size);
1723                 set_capacity(rbd_dev->disk, size);
1724         }
1725
1726         /* rbd_dev->header.object_prefix shouldn't change */
1727         kfree(rbd_dev->header.snap_sizes);
1728         kfree(rbd_dev->header.snap_names);
1729         /* osd requests may still refer to snapc */
1730         ceph_put_snap_context(rbd_dev->header.snapc);
1731
1732         rbd_dev->header.obj_version = h.obj_version;
1733         rbd_dev->header.image_size = h.image_size;
1734         rbd_dev->header.total_snaps = h.total_snaps;
1735         rbd_dev->header.snapc = h.snapc;
1736         rbd_dev->header.snap_names = h.snap_names;
1737         rbd_dev->header.snap_names_len = h.snap_names_len;
1738         rbd_dev->header.snap_sizes = h.snap_sizes;
1739         /* Free the extra copy of the object prefix */
1740         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1741         kfree(h.object_prefix);
1742
1743         ret = __rbd_init_snaps_header(rbd_dev);
1744
1745         up_write(&rbd_dev->header_rwsem);
1746
1747         return ret;
1748 }
1749
1750 static int rbd_init_disk(struct rbd_device *rbd_dev)
1751 {
1752         struct gendisk *disk;
1753         struct request_queue *q;
1754         int rc;
1755         u64 segment_size;
1756         u64 total_size = 0;
1757
1758         /* contact OSD, request size info about the object being mapped */
1759         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1760         if (rc)
1761                 return rc;
1762
1763         /* no need to lock here, as rbd_dev is not registered yet */
1764         rc = __rbd_init_snaps_header(rbd_dev);
1765         if (rc)
1766                 return rc;
1767
1768         rc = rbd_header_set_snap(rbd_dev, &total_size);
1769         if (rc)
1770                 return rc;
1771
1772         /* create gendisk info */
1773         rc = -ENOMEM;
1774         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1775         if (!disk)
1776                 goto out;
1777
1778         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1779                  rbd_dev->id);
1780         disk->major = rbd_dev->major;
1781         disk->first_minor = 0;
1782         disk->fops = &rbd_bd_ops;
1783         disk->private_data = rbd_dev;
1784
1785         /* init rq */
1786         rc = -ENOMEM;
1787         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1788         if (!q)
1789                 goto out_disk;
1790
1791         /* We use the default size, but let's be explicit about it. */
1792         blk_queue_physical_block_size(q, SECTOR_SIZE);
1793
1794         /* set io sizes to object size */
1795         segment_size = rbd_obj_bytes(&rbd_dev->header);
1796         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1797         blk_queue_max_segment_size(q, segment_size);
1798         blk_queue_io_min(q, segment_size);
1799         blk_queue_io_opt(q, segment_size);
1800
1801         blk_queue_merge_bvec(q, rbd_merge_bvec);
1802         disk->queue = q;
1803
1804         q->queuedata = rbd_dev;
1805
1806         rbd_dev->disk = disk;
1807         rbd_dev->q = q;
1808
1809         /* finally, announce the disk to the world */
1810         set_capacity(disk, total_size / SECTOR_SIZE);
1811         add_disk(disk);
1812
1813         pr_info("%s: added with size 0x%llx\n",
1814                 disk->disk_name, (unsigned long long)total_size);
1815         return 0;
1816
1817 out_disk:
1818         put_disk(disk);
1819 out:
1820         return rc;
1821 }
1822
1823 /*
1824   sysfs
1825 */
1826
1827 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1828 {
1829         return container_of(dev, struct rbd_device, dev);
1830 }
1831
1832 static ssize_t rbd_size_show(struct device *dev,
1833                              struct device_attribute *attr, char *buf)
1834 {
1835         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1836         sector_t size;
1837
1838         down_read(&rbd_dev->header_rwsem);
1839         size = get_capacity(rbd_dev->disk);
1840         up_read(&rbd_dev->header_rwsem);
1841
1842         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1843 }
1844
1845 static ssize_t rbd_major_show(struct device *dev,
1846                               struct device_attribute *attr, char *buf)
1847 {
1848         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1849
1850         return sprintf(buf, "%d\n", rbd_dev->major);
1851 }
1852
1853 static ssize_t rbd_client_id_show(struct device *dev,
1854                                   struct device_attribute *attr, char *buf)
1855 {
1856         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1857
1858         return sprintf(buf, "client%lld\n",
1859                         ceph_client_id(rbd_dev->rbd_client->client));
1860 }
1861
1862 static ssize_t rbd_pool_show(struct device *dev,
1863                              struct device_attribute *attr, char *buf)
1864 {
1865         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1866
1867         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1868 }
1869
1870 static ssize_t rbd_pool_id_show(struct device *dev,
1871                              struct device_attribute *attr, char *buf)
1872 {
1873         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1874
1875         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1876 }
1877
1878 static ssize_t rbd_name_show(struct device *dev,
1879                              struct device_attribute *attr, char *buf)
1880 {
1881         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1882
1883         return sprintf(buf, "%s\n", rbd_dev->image_name);
1884 }
1885
1886 static ssize_t rbd_snap_show(struct device *dev,
1887                              struct device_attribute *attr,
1888                              char *buf)
1889 {
1890         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1891
1892         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1893 }
1894
1895 static ssize_t rbd_image_refresh(struct device *dev,
1896                                  struct device_attribute *attr,
1897                                  const char *buf,
1898                                  size_t size)
1899 {
1900         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1901         int rc;
1902         int ret = size;
1903
1904         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1905
1906         rc = __rbd_refresh_header(rbd_dev);
1907         if (rc < 0)
1908                 ret = rc;
1909
1910         mutex_unlock(&ctl_mutex);
1911         return ret;
1912 }
1913
1914 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1915 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1916 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1917 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1918 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1919 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1920 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1921 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1922 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1923
1924 static struct attribute *rbd_attrs[] = {
1925         &dev_attr_size.attr,
1926         &dev_attr_major.attr,
1927         &dev_attr_client_id.attr,
1928         &dev_attr_pool.attr,
1929         &dev_attr_pool_id.attr,
1930         &dev_attr_name.attr,
1931         &dev_attr_current_snap.attr,
1932         &dev_attr_refresh.attr,
1933         &dev_attr_create_snap.attr,
1934         NULL
1935 };
1936
1937 static struct attribute_group rbd_attr_group = {
1938         .attrs = rbd_attrs,
1939 };
1940
1941 static const struct attribute_group *rbd_attr_groups[] = {
1942         &rbd_attr_group,
1943         NULL
1944 };
1945
1946 static void rbd_sysfs_dev_release(struct device *dev)
1947 {
1948 }
1949
1950 static struct device_type rbd_device_type = {
1951         .name           = "rbd",
1952         .groups         = rbd_attr_groups,
1953         .release        = rbd_sysfs_dev_release,
1954 };
1955
1956
1957 /*
1958   sysfs - snapshots
1959 */
1960
1961 static ssize_t rbd_snap_size_show(struct device *dev,
1962                                   struct device_attribute *attr,
1963                                   char *buf)
1964 {
1965         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1966
1967         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1968 }
1969
1970 static ssize_t rbd_snap_id_show(struct device *dev,
1971                                 struct device_attribute *attr,
1972                                 char *buf)
1973 {
1974         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1975
1976         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1977 }
1978
1979 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1980 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1981
1982 static struct attribute *rbd_snap_attrs[] = {
1983         &dev_attr_snap_size.attr,
1984         &dev_attr_snap_id.attr,
1985         NULL,
1986 };
1987
1988 static struct attribute_group rbd_snap_attr_group = {
1989         .attrs = rbd_snap_attrs,
1990 };
1991
1992 static void rbd_snap_dev_release(struct device *dev)
1993 {
1994         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1995         kfree(snap->name);
1996         kfree(snap);
1997 }
1998
1999 static const struct attribute_group *rbd_snap_attr_groups[] = {
2000         &rbd_snap_attr_group,
2001         NULL
2002 };
2003
2004 static struct device_type rbd_snap_device_type = {
2005         .groups         = rbd_snap_attr_groups,
2006         .release        = rbd_snap_dev_release,
2007 };
2008
2009 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
2010                                   struct rbd_snap *snap)
2011 {
2012         list_del(&snap->node);
2013         device_unregister(&snap->dev);
2014 }
2015
2016 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2017                                   struct rbd_snap *snap,
2018                                   struct device *parent)
2019 {
2020         struct device *dev = &snap->dev;
2021         int ret;
2022
2023         dev->type = &rbd_snap_device_type;
2024         dev->parent = parent;
2025         dev->release = rbd_snap_dev_release;
2026         dev_set_name(dev, "snap_%s", snap->name);
2027         ret = device_register(dev);
2028
2029         return ret;
2030 }
2031
2032 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2033                               int i, const char *name,
2034                               struct rbd_snap **snapp)
2035 {
2036         int ret;
2037         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2038         if (!snap)
2039                 return -ENOMEM;
2040         snap->name = kstrdup(name, GFP_KERNEL);
2041         snap->size = rbd_dev->header.snap_sizes[i];
2042         snap->id = rbd_dev->header.snapc->snaps[i];
2043         if (device_is_registered(&rbd_dev->dev)) {
2044                 ret = rbd_register_snap_dev(rbd_dev, snap,
2045                                              &rbd_dev->dev);
2046                 if (ret < 0)
2047                         goto err;
2048         }
2049         *snapp = snap;
2050         return 0;
2051 err:
2052         kfree(snap->name);
2053         kfree(snap);
2054         return ret;
2055 }
2056
2057 /*
2058  * search for the previous snap in a null delimited string list
2059  */
2060 const char *rbd_prev_snap_name(const char *name, const char *start)
2061 {
2062         if (name < start + 2)
2063                 return NULL;
2064
2065         name -= 2;
2066         while (*name) {
2067                 if (name == start)
2068                         return start;
2069                 name--;
2070         }
2071         return name + 1;
2072 }
2073
2074 /*
2075  * compare the old list of snapshots that we have to what's in the header
2076  * and update it accordingly. Note that the header holds the snapshots
2077  * in a reverse order (from newest to oldest) and we need to go from
2078  * older to new so that we don't get a duplicate snap name when
2079  * doing the process (e.g., removed snapshot and recreated a new
2080  * one with the same name.
2081  */
2082 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2083 {
2084         const char *name, *first_name;
2085         int i = rbd_dev->header.total_snaps;
2086         struct rbd_snap *snap, *old_snap = NULL;
2087         int ret;
2088         struct list_head *p, *n;
2089
2090         first_name = rbd_dev->header.snap_names;
2091         name = first_name + rbd_dev->header.snap_names_len;
2092
2093         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2094                 u64 cur_id;
2095
2096                 old_snap = list_entry(p, struct rbd_snap, node);
2097
2098                 if (i)
2099                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2100
2101                 if (!i || old_snap->id < cur_id) {
2102                         /*
2103                          * old_snap->id was skipped, thus was
2104                          * removed.  If this rbd_dev is mapped to
2105                          * the removed snapshot, record that it no
2106                          * longer exists, to prevent further I/O.
2107                          */
2108                         if (rbd_dev->snap_id == old_snap->id)
2109                                 rbd_dev->snap_exists = false;
2110                         __rbd_remove_snap_dev(rbd_dev, old_snap);
2111                         continue;
2112                 }
2113                 if (old_snap->id == cur_id) {
2114                         /* we have this snapshot already */
2115                         i--;
2116                         name = rbd_prev_snap_name(name, first_name);
2117                         continue;
2118                 }
2119                 for (; i > 0;
2120                      i--, name = rbd_prev_snap_name(name, first_name)) {
2121                         if (!name) {
2122                                 WARN_ON(1);
2123                                 return -EINVAL;
2124                         }
2125                         cur_id = rbd_dev->header.snapc->snaps[i];
2126                         /* snapshot removal? handle it above */
2127                         if (cur_id >= old_snap->id)
2128                                 break;
2129                         /* a new snapshot */
2130                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2131                         if (ret < 0)
2132                                 return ret;
2133
2134                         /* note that we add it backward so using n and not p */
2135                         list_add(&snap->node, n);
2136                         p = &snap->node;
2137                 }
2138         }
2139         /* we're done going over the old snap list, just add what's left */
2140         for (; i > 0; i--) {
2141                 name = rbd_prev_snap_name(name, first_name);
2142                 if (!name) {
2143                         WARN_ON(1);
2144                         return -EINVAL;
2145                 }
2146                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2147                 if (ret < 0)
2148                         return ret;
2149                 list_add(&snap->node, &rbd_dev->snaps);
2150         }
2151
2152         return 0;
2153 }
2154
2155 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2156 {
2157         int ret;
2158         struct device *dev;
2159         struct rbd_snap *snap;
2160
2161         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2162         dev = &rbd_dev->dev;
2163
2164         dev->bus = &rbd_bus_type;
2165         dev->type = &rbd_device_type;
2166         dev->parent = &rbd_root_dev;
2167         dev->release = rbd_dev_release;
2168         dev_set_name(dev, "%d", rbd_dev->id);
2169         ret = device_register(dev);
2170         if (ret < 0)
2171                 goto out;
2172
2173         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2174                 ret = rbd_register_snap_dev(rbd_dev, snap,
2175                                              &rbd_dev->dev);
2176                 if (ret < 0)
2177                         break;
2178         }
2179 out:
2180         mutex_unlock(&ctl_mutex);
2181         return ret;
2182 }
2183
2184 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2185 {
2186         device_unregister(&rbd_dev->dev);
2187 }
2188
2189 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2190 {
2191         int ret, rc;
2192
2193         do {
2194                 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->header_name,
2195                                          rbd_dev->header.obj_version);
2196                 if (ret == -ERANGE) {
2197                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2198                         rc = __rbd_refresh_header(rbd_dev);
2199                         mutex_unlock(&ctl_mutex);
2200                         if (rc < 0)
2201                                 return rc;
2202                 }
2203         } while (ret == -ERANGE);
2204
2205         return ret;
2206 }
2207
2208 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2209
2210 /*
2211  * Get a unique rbd identifier for the given new rbd_dev, and add
2212  * the rbd_dev to the global list.  The minimum rbd id is 1.
2213  */
2214 static void rbd_id_get(struct rbd_device *rbd_dev)
2215 {
2216         rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2217
2218         spin_lock(&rbd_dev_list_lock);
2219         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2220         spin_unlock(&rbd_dev_list_lock);
2221 }
2222
2223 /*
2224  * Remove an rbd_dev from the global list, and record that its
2225  * identifier is no longer in use.
2226  */
2227 static void rbd_id_put(struct rbd_device *rbd_dev)
2228 {
2229         struct list_head *tmp;
2230         int rbd_id = rbd_dev->id;
2231         int max_id;
2232
2233         BUG_ON(rbd_id < 1);
2234
2235         spin_lock(&rbd_dev_list_lock);
2236         list_del_init(&rbd_dev->node);
2237
2238         /*
2239          * If the id being "put" is not the current maximum, there
2240          * is nothing special we need to do.
2241          */
2242         if (rbd_id != atomic64_read(&rbd_id_max)) {
2243                 spin_unlock(&rbd_dev_list_lock);
2244                 return;
2245         }
2246
2247         /*
2248          * We need to update the current maximum id.  Search the
2249          * list to find out what it is.  We're more likely to find
2250          * the maximum at the end, so search the list backward.
2251          */
2252         max_id = 0;
2253         list_for_each_prev(tmp, &rbd_dev_list) {
2254                 struct rbd_device *rbd_dev;
2255
2256                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2257                 if (rbd_id > max_id)
2258                         max_id = rbd_id;
2259         }
2260         spin_unlock(&rbd_dev_list_lock);
2261
2262         /*
2263          * The max id could have been updated by rbd_id_get(), in
2264          * which case it now accurately reflects the new maximum.
2265          * Be careful not to overwrite the maximum value in that
2266          * case.
2267          */
2268         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2269 }
2270
2271 /*
2272  * Skips over white space at *buf, and updates *buf to point to the
2273  * first found non-space character (if any). Returns the length of
2274  * the token (string of non-white space characters) found.  Note
2275  * that *buf must be terminated with '\0'.
2276  */
2277 static inline size_t next_token(const char **buf)
2278 {
2279         /*
2280         * These are the characters that produce nonzero for
2281         * isspace() in the "C" and "POSIX" locales.
2282         */
2283         const char *spaces = " \f\n\r\t\v";
2284
2285         *buf += strspn(*buf, spaces);   /* Find start of token */
2286
2287         return strcspn(*buf, spaces);   /* Return token length */
2288 }
2289
2290 /*
2291  * Finds the next token in *buf, and if the provided token buffer is
2292  * big enough, copies the found token into it.  The result, if
2293  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2294  * must be terminated with '\0' on entry.
2295  *
2296  * Returns the length of the token found (not including the '\0').
2297  * Return value will be 0 if no token is found, and it will be >=
2298  * token_size if the token would not fit.
2299  *
2300  * The *buf pointer will be updated to point beyond the end of the
2301  * found token.  Note that this occurs even if the token buffer is
2302  * too small to hold it.
2303  */
2304 static inline size_t copy_token(const char **buf,
2305                                 char *token,
2306                                 size_t token_size)
2307 {
2308         size_t len;
2309
2310         len = next_token(buf);
2311         if (len < token_size) {
2312                 memcpy(token, *buf, len);
2313                 *(token + len) = '\0';
2314         }
2315         *buf += len;
2316
2317         return len;
2318 }
2319
2320 /*
2321  * Finds the next token in *buf, dynamically allocates a buffer big
2322  * enough to hold a copy of it, and copies the token into the new
2323  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2324  * that a duplicate buffer is created even for a zero-length token.
2325  *
2326  * Returns a pointer to the newly-allocated duplicate, or a null
2327  * pointer if memory for the duplicate was not available.  If
2328  * the lenp argument is a non-null pointer, the length of the token
2329  * (not including the '\0') is returned in *lenp.
2330  *
2331  * If successful, the *buf pointer will be updated to point beyond
2332  * the end of the found token.
2333  *
2334  * Note: uses GFP_KERNEL for allocation.
2335  */
2336 static inline char *dup_token(const char **buf, size_t *lenp)
2337 {
2338         char *dup;
2339         size_t len;
2340
2341         len = next_token(buf);
2342         dup = kmalloc(len + 1, GFP_KERNEL);
2343         if (!dup)
2344                 return NULL;
2345
2346         memcpy(dup, *buf, len);
2347         *(dup + len) = '\0';
2348         *buf += len;
2349
2350         if (lenp)
2351                 *lenp = len;
2352
2353         return dup;
2354 }
2355
2356 /*
2357  * This fills in the pool_name, image_name, image_name_len, snap_name,
2358  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2359  * on the list of monitor addresses and other options provided via
2360  * /sys/bus/rbd/add.
2361  *
2362  * Note: rbd_dev is assumed to have been initially zero-filled.
2363  */
2364 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2365                               const char *buf,
2366                               const char **mon_addrs,
2367                               size_t *mon_addrs_size,
2368                               char *options,
2369                              size_t options_size)
2370 {
2371         size_t len;
2372         int ret;
2373
2374         /* The first four tokens are required */
2375
2376         len = next_token(&buf);
2377         if (!len)
2378                 return -EINVAL;
2379         *mon_addrs_size = len + 1;
2380         *mon_addrs = buf;
2381
2382         buf += len;
2383
2384         len = copy_token(&buf, options, options_size);
2385         if (!len || len >= options_size)
2386                 return -EINVAL;
2387
2388         ret = -ENOMEM;
2389         rbd_dev->pool_name = dup_token(&buf, NULL);
2390         if (!rbd_dev->pool_name)
2391                 goto out_err;
2392
2393         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2394         if (!rbd_dev->image_name)
2395                 goto out_err;
2396
2397         /* Create the name of the header object */
2398
2399         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2400                                                 + sizeof (RBD_SUFFIX),
2401                                         GFP_KERNEL);
2402         if (!rbd_dev->header_name)
2403                 goto out_err;
2404         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2405
2406         /*
2407          * The snapshot name is optional.  If none is is supplied,
2408          * we use the default value.
2409          */
2410         rbd_dev->snap_name = dup_token(&buf, &len);
2411         if (!rbd_dev->snap_name)
2412                 goto out_err;
2413         if (!len) {
2414                 /* Replace the empty name with the default */
2415                 kfree(rbd_dev->snap_name);
2416                 rbd_dev->snap_name
2417                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2418                 if (!rbd_dev->snap_name)
2419                         goto out_err;
2420
2421                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2422                         sizeof (RBD_SNAP_HEAD_NAME));
2423         }
2424
2425         return 0;
2426
2427 out_err:
2428         kfree(rbd_dev->header_name);
2429         kfree(rbd_dev->image_name);
2430         kfree(rbd_dev->pool_name);
2431         rbd_dev->pool_name = NULL;
2432
2433         return ret;
2434 }
2435
2436 static ssize_t rbd_add(struct bus_type *bus,
2437                        const char *buf,
2438                        size_t count)
2439 {
2440         char *options;
2441         struct rbd_device *rbd_dev = NULL;
2442         const char *mon_addrs = NULL;
2443         size_t mon_addrs_size = 0;
2444         struct ceph_osd_client *osdc;
2445         int rc = -ENOMEM;
2446
2447         if (!try_module_get(THIS_MODULE))
2448                 return -ENODEV;
2449
2450         options = kmalloc(count, GFP_KERNEL);
2451         if (!options)
2452                 goto err_nomem;
2453         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2454         if (!rbd_dev)
2455                 goto err_nomem;
2456
2457         /* static rbd_device initialization */
2458         spin_lock_init(&rbd_dev->lock);
2459         INIT_LIST_HEAD(&rbd_dev->node);
2460         INIT_LIST_HEAD(&rbd_dev->snaps);
2461         init_rwsem(&rbd_dev->header_rwsem);
2462
2463         /* generate unique id: find highest unique id, add one */
2464         rbd_id_get(rbd_dev);
2465
2466         /* Fill in the device name, now that we have its id. */
2467         BUILD_BUG_ON(DEV_NAME_LEN
2468                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2469         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2470
2471         /* parse add command */
2472         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2473                                 options, count);
2474         if (rc)
2475                 goto err_put_id;
2476
2477         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2478                                                 options);
2479         if (IS_ERR(rbd_dev->rbd_client)) {
2480                 rc = PTR_ERR(rbd_dev->rbd_client);
2481                 goto err_put_id;
2482         }
2483
2484         /* pick the pool */
2485         osdc = &rbd_dev->rbd_client->client->osdc;
2486         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2487         if (rc < 0)
2488                 goto err_out_client;
2489         rbd_dev->pool_id = rc;
2490
2491         /* register our block device */
2492         rc = register_blkdev(0, rbd_dev->name);
2493         if (rc < 0)
2494                 goto err_out_client;
2495         rbd_dev->major = rc;
2496
2497         rc = rbd_bus_add_dev(rbd_dev);
2498         if (rc)
2499                 goto err_out_blkdev;
2500
2501         /*
2502          * At this point cleanup in the event of an error is the job
2503          * of the sysfs code (initiated by rbd_bus_del_dev()).
2504          *
2505          * Set up and announce blkdev mapping.
2506          */
2507         rc = rbd_init_disk(rbd_dev);
2508         if (rc)
2509                 goto err_out_bus;
2510
2511         rc = rbd_init_watch_dev(rbd_dev);
2512         if (rc)
2513                 goto err_out_bus;
2514
2515         return count;
2516
2517 err_out_bus:
2518         /* this will also clean up rest of rbd_dev stuff */
2519
2520         rbd_bus_del_dev(rbd_dev);
2521         kfree(options);
2522         return rc;
2523
2524 err_out_blkdev:
2525         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2526 err_out_client:
2527         rbd_put_client(rbd_dev);
2528 err_put_id:
2529         if (rbd_dev->pool_name) {
2530                 kfree(rbd_dev->snap_name);
2531                 kfree(rbd_dev->header_name);
2532                 kfree(rbd_dev->image_name);
2533                 kfree(rbd_dev->pool_name);
2534         }
2535         rbd_id_put(rbd_dev);
2536 err_nomem:
2537         kfree(rbd_dev);
2538         kfree(options);
2539
2540         dout("Error adding device %s\n", buf);
2541         module_put(THIS_MODULE);
2542
2543         return (ssize_t) rc;
2544 }
2545
2546 static struct rbd_device *__rbd_get_dev(unsigned long id)
2547 {
2548         struct list_head *tmp;
2549         struct rbd_device *rbd_dev;
2550
2551         spin_lock(&rbd_dev_list_lock);
2552         list_for_each(tmp, &rbd_dev_list) {
2553                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2554                 if (rbd_dev->id == id) {
2555                         spin_unlock(&rbd_dev_list_lock);
2556                         return rbd_dev;
2557                 }
2558         }
2559         spin_unlock(&rbd_dev_list_lock);
2560         return NULL;
2561 }
2562
2563 static void rbd_dev_release(struct device *dev)
2564 {
2565         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2566
2567         if (rbd_dev->watch_request) {
2568                 struct ceph_client *client = rbd_dev->rbd_client->client;
2569
2570                 ceph_osdc_unregister_linger_request(&client->osdc,
2571                                                     rbd_dev->watch_request);
2572         }
2573         if (rbd_dev->watch_event)
2574                 rbd_req_sync_unwatch(rbd_dev, rbd_dev->header_name);
2575
2576         rbd_put_client(rbd_dev);
2577
2578         /* clean up and free blkdev */
2579         rbd_free_disk(rbd_dev);
2580         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2581
2582         /* done with the id, and with the rbd_dev */
2583         kfree(rbd_dev->snap_name);
2584         kfree(rbd_dev->header_name);
2585         kfree(rbd_dev->pool_name);
2586         kfree(rbd_dev->image_name);
2587         rbd_id_put(rbd_dev);
2588         kfree(rbd_dev);
2589
2590         /* release module ref */
2591         module_put(THIS_MODULE);
2592 }
2593
2594 static ssize_t rbd_remove(struct bus_type *bus,
2595                           const char *buf,
2596                           size_t count)
2597 {
2598         struct rbd_device *rbd_dev = NULL;
2599         int target_id, rc;
2600         unsigned long ul;
2601         int ret = count;
2602
2603         rc = strict_strtoul(buf, 10, &ul);
2604         if (rc)
2605                 return rc;
2606
2607         /* convert to int; abort if we lost anything in the conversion */
2608         target_id = (int) ul;
2609         if (target_id != ul)
2610                 return -EINVAL;
2611
2612         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2613
2614         rbd_dev = __rbd_get_dev(target_id);
2615         if (!rbd_dev) {
2616                 ret = -ENOENT;
2617                 goto done;
2618         }
2619
2620         __rbd_remove_all_snaps(rbd_dev);
2621         rbd_bus_del_dev(rbd_dev);
2622
2623 done:
2624         mutex_unlock(&ctl_mutex);
2625         return ret;
2626 }
2627
2628 static ssize_t rbd_snap_add(struct device *dev,
2629                             struct device_attribute *attr,
2630                             const char *buf,
2631                             size_t count)
2632 {
2633         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2634         int ret;
2635         char *name = kmalloc(count + 1, GFP_KERNEL);
2636         if (!name)
2637                 return -ENOMEM;
2638
2639         snprintf(name, count, "%s", buf);
2640
2641         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2642
2643         ret = rbd_header_add_snap(rbd_dev,
2644                                   name, GFP_KERNEL);
2645         if (ret < 0)
2646                 goto err_unlock;
2647
2648         ret = __rbd_refresh_header(rbd_dev);
2649         if (ret < 0)
2650                 goto err_unlock;
2651
2652         /* shouldn't hold ctl_mutex when notifying.. notify might
2653            trigger a watch callback that would need to get that mutex */
2654         mutex_unlock(&ctl_mutex);
2655
2656         /* make a best effort, don't error if failed */
2657         rbd_req_sync_notify(rbd_dev, rbd_dev->header_name);
2658
2659         ret = count;
2660         kfree(name);
2661         return ret;
2662
2663 err_unlock:
2664         mutex_unlock(&ctl_mutex);
2665         kfree(name);
2666         return ret;
2667 }
2668
2669 /*
2670  * create control files in sysfs
2671  * /sys/bus/rbd/...
2672  */
2673 static int rbd_sysfs_init(void)
2674 {
2675         int ret;
2676
2677         ret = device_register(&rbd_root_dev);
2678         if (ret < 0)
2679                 return ret;
2680
2681         ret = bus_register(&rbd_bus_type);
2682         if (ret < 0)
2683                 device_unregister(&rbd_root_dev);
2684
2685         return ret;
2686 }
2687
2688 static void rbd_sysfs_cleanup(void)
2689 {
2690         bus_unregister(&rbd_bus_type);
2691         device_unregister(&rbd_root_dev);
2692 }
2693
2694 int __init rbd_init(void)
2695 {
2696         int rc;
2697
2698         rc = rbd_sysfs_init();
2699         if (rc)
2700                 return rc;
2701         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2702         return 0;
2703 }
2704
2705 void __exit rbd_exit(void)
2706 {
2707         rbd_sysfs_cleanup();
2708 }
2709
2710 module_init(rbd_init);
2711 module_exit(rbd_exit);
2712
2713 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2714 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2715 MODULE_DESCRIPTION("rados block device");
2716
2717 /* following authorship retained from original osdblk.c */
2718 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2719
2720 MODULE_LICENSE("GPL");