rbd: set image size when header is updated
[firefly-linux-kernel-4.4.55.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         size_t snap_names_len;
85         u64 snap_seq;
86         u32 total_snaps;
87
88         char *snap_names;
89         u64 *snap_sizes;
90
91         u64 obj_version;
92 };
93
94 struct rbd_options {
95         int     notify_timeout;
96 };
97
98 /*
99  * an instance of the client.  multiple devices may share an rbd client.
100  */
101 struct rbd_client {
102         struct ceph_client      *client;
103         struct rbd_options      *rbd_opts;
104         struct kref             kref;
105         struct list_head        node;
106 };
107
108 /*
109  * a request completion status
110  */
111 struct rbd_req_status {
112         int done;
113         int rc;
114         u64 bytes;
115 };
116
117 /*
118  * a collection of requests
119  */
120 struct rbd_req_coll {
121         int                     total;
122         int                     num_done;
123         struct kref             kref;
124         struct rbd_req_status   status[0];
125 };
126
127 /*
128  * a single io request
129  */
130 struct rbd_request {
131         struct request          *rq;            /* blk layer request */
132         struct bio              *bio;           /* cloned bio */
133         struct page             **pages;        /* list of used pages */
134         u64                     len;
135         int                     coll_index;
136         struct rbd_req_coll     *coll;
137 };
138
139 struct rbd_snap {
140         struct  device          dev;
141         const char              *name;
142         u64                     size;
143         struct list_head        node;
144         u64                     id;
145 };
146
147 /*
148  * a single device
149  */
150 struct rbd_device {
151         int                     id;             /* blkdev unique id */
152
153         int                     major;          /* blkdev assigned major */
154         struct gendisk          *disk;          /* blkdev's gendisk and rq */
155         struct request_queue    *q;
156
157         struct rbd_client       *rbd_client;
158
159         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
160
161         spinlock_t              lock;           /* queue lock */
162
163         struct rbd_image_header header;
164         char                    *image_name;
165         size_t                  image_name_len;
166         char                    *header_name;
167         char                    *pool_name;
168         int                     pool_id;
169
170         struct ceph_osd_event   *watch_event;
171         struct ceph_osd_request *watch_request;
172
173         /* protects updating the header */
174         struct rw_semaphore     header_rwsem;
175         /* name of the snapshot this device reads from */
176         char                    *snap_name;
177         /* id of the snapshot this device reads from */
178         u64                     snap_id;        /* current snapshot id */
179         /* whether the snap_id this device reads from still exists */
180         bool                    snap_exists;
181         int                     read_only;
182
183         struct list_head        node;
184
185         /* list of snapshots */
186         struct list_head        snaps;
187
188         /* sysfs related */
189         struct device           dev;
190 };
191
192 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
193
194 static LIST_HEAD(rbd_dev_list);    /* devices */
195 static DEFINE_SPINLOCK(rbd_dev_list_lock);
196
197 static LIST_HEAD(rbd_client_list);              /* clients */
198 static DEFINE_SPINLOCK(rbd_client_list_lock);
199
200 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
201 static void rbd_dev_release(struct device *dev);
202 static ssize_t rbd_snap_add(struct device *dev,
203                             struct device_attribute *attr,
204                             const char *buf,
205                             size_t count);
206 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
207                                   struct rbd_snap *snap);
208
209 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
210                        size_t count);
211 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
212                           size_t count);
213
214 static struct bus_attribute rbd_bus_attrs[] = {
215         __ATTR(add, S_IWUSR, NULL, rbd_add),
216         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
217         __ATTR_NULL
218 };
219
220 static struct bus_type rbd_bus_type = {
221         .name           = "rbd",
222         .bus_attrs      = rbd_bus_attrs,
223 };
224
225 static void rbd_root_dev_release(struct device *dev)
226 {
227 }
228
229 static struct device rbd_root_dev = {
230         .init_name =    "rbd",
231         .release =      rbd_root_dev_release,
232 };
233
234
235 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
236 {
237         return get_device(&rbd_dev->dev);
238 }
239
240 static void rbd_put_dev(struct rbd_device *rbd_dev)
241 {
242         put_device(&rbd_dev->dev);
243 }
244
245 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
246
247 static int rbd_open(struct block_device *bdev, fmode_t mode)
248 {
249         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
250
251         rbd_get_dev(rbd_dev);
252
253         set_device_ro(bdev, rbd_dev->read_only);
254
255         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
256                 return -EROFS;
257
258         return 0;
259 }
260
261 static int rbd_release(struct gendisk *disk, fmode_t mode)
262 {
263         struct rbd_device *rbd_dev = disk->private_data;
264
265         rbd_put_dev(rbd_dev);
266
267         return 0;
268 }
269
270 static const struct block_device_operations rbd_bd_ops = {
271         .owner                  = THIS_MODULE,
272         .open                   = rbd_open,
273         .release                = rbd_release,
274 };
275
276 /*
277  * Initialize an rbd client instance.
278  * We own *ceph_opts.
279  */
280 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
281                                             struct rbd_options *rbd_opts)
282 {
283         struct rbd_client *rbdc;
284         int ret = -ENOMEM;
285
286         dout("rbd_client_create\n");
287         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
288         if (!rbdc)
289                 goto out_opt;
290
291         kref_init(&rbdc->kref);
292         INIT_LIST_HEAD(&rbdc->node);
293
294         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
295
296         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
297         if (IS_ERR(rbdc->client))
298                 goto out_mutex;
299         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
300
301         ret = ceph_open_session(rbdc->client);
302         if (ret < 0)
303                 goto out_err;
304
305         rbdc->rbd_opts = rbd_opts;
306
307         spin_lock(&rbd_client_list_lock);
308         list_add_tail(&rbdc->node, &rbd_client_list);
309         spin_unlock(&rbd_client_list_lock);
310
311         mutex_unlock(&ctl_mutex);
312
313         dout("rbd_client_create created %p\n", rbdc);
314         return rbdc;
315
316 out_err:
317         ceph_destroy_client(rbdc->client);
318 out_mutex:
319         mutex_unlock(&ctl_mutex);
320         kfree(rbdc);
321 out_opt:
322         if (ceph_opts)
323                 ceph_destroy_options(ceph_opts);
324         return ERR_PTR(ret);
325 }
326
327 /*
328  * Find a ceph client with specific addr and configuration.
329  */
330 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
331 {
332         struct rbd_client *client_node;
333
334         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
335                 return NULL;
336
337         list_for_each_entry(client_node, &rbd_client_list, node)
338                 if (!ceph_compare_options(ceph_opts, client_node->client))
339                         return client_node;
340         return NULL;
341 }
342
343 /*
344  * mount options
345  */
346 enum {
347         Opt_notify_timeout,
348         Opt_last_int,
349         /* int args above */
350         Opt_last_string,
351         /* string args above */
352 };
353
354 static match_table_t rbd_opts_tokens = {
355         {Opt_notify_timeout, "notify_timeout=%d"},
356         /* int args above */
357         /* string args above */
358         {-1, NULL}
359 };
360
361 static int parse_rbd_opts_token(char *c, void *private)
362 {
363         struct rbd_options *rbd_opts = private;
364         substring_t argstr[MAX_OPT_ARGS];
365         int token, intval, ret;
366
367         token = match_token(c, rbd_opts_tokens, argstr);
368         if (token < 0)
369                 return -EINVAL;
370
371         if (token < Opt_last_int) {
372                 ret = match_int(&argstr[0], &intval);
373                 if (ret < 0) {
374                         pr_err("bad mount option arg (not int) "
375                                "at '%s'\n", c);
376                         return ret;
377                 }
378                 dout("got int token %d val %d\n", token, intval);
379         } else if (token > Opt_last_int && token < Opt_last_string) {
380                 dout("got string token %d val %s\n", token,
381                      argstr[0].from);
382         } else {
383                 dout("got token %d\n", token);
384         }
385
386         switch (token) {
387         case Opt_notify_timeout:
388                 rbd_opts->notify_timeout = intval;
389                 break;
390         default:
391                 BUG_ON(token);
392         }
393         return 0;
394 }
395
396 /*
397  * Get a ceph client with specific addr and configuration, if one does
398  * not exist create it.
399  */
400 static struct rbd_client *rbd_get_client(const char *mon_addr,
401                                          size_t mon_addr_len,
402                                          char *options)
403 {
404         struct rbd_client *rbdc;
405         struct ceph_options *ceph_opts;
406         struct rbd_options *rbd_opts;
407
408         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
409         if (!rbd_opts)
410                 return ERR_PTR(-ENOMEM);
411
412         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
413
414         ceph_opts = ceph_parse_options(options, mon_addr,
415                                         mon_addr + mon_addr_len,
416                                         parse_rbd_opts_token, rbd_opts);
417         if (IS_ERR(ceph_opts)) {
418                 kfree(rbd_opts);
419                 return ERR_CAST(ceph_opts);
420         }
421
422         spin_lock(&rbd_client_list_lock);
423         rbdc = __rbd_client_find(ceph_opts);
424         if (rbdc) {
425                 /* using an existing client */
426                 kref_get(&rbdc->kref);
427                 spin_unlock(&rbd_client_list_lock);
428
429                 ceph_destroy_options(ceph_opts);
430                 kfree(rbd_opts);
431
432                 return rbdc;
433         }
434         spin_unlock(&rbd_client_list_lock);
435
436         rbdc = rbd_client_create(ceph_opts, rbd_opts);
437
438         if (IS_ERR(rbdc))
439                 kfree(rbd_opts);
440
441         return rbdc;
442 }
443
444 /*
445  * Destroy ceph client
446  *
447  * Caller must hold rbd_client_list_lock.
448  */
449 static void rbd_client_release(struct kref *kref)
450 {
451         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
452
453         dout("rbd_release_client %p\n", rbdc);
454         spin_lock(&rbd_client_list_lock);
455         list_del(&rbdc->node);
456         spin_unlock(&rbd_client_list_lock);
457
458         ceph_destroy_client(rbdc->client);
459         kfree(rbdc->rbd_opts);
460         kfree(rbdc);
461 }
462
463 /*
464  * Drop reference to ceph client node. If it's not referenced anymore, release
465  * it.
466  */
467 static void rbd_put_client(struct rbd_device *rbd_dev)
468 {
469         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
470         rbd_dev->rbd_client = NULL;
471 }
472
473 /*
474  * Destroy requests collection
475  */
476 static void rbd_coll_release(struct kref *kref)
477 {
478         struct rbd_req_coll *coll =
479                 container_of(kref, struct rbd_req_coll, kref);
480
481         dout("rbd_coll_release %p\n", coll);
482         kfree(coll);
483 }
484
485 /*
486  * Create a new header structure, translate header format from the on-disk
487  * header.
488  */
489 static int rbd_header_from_disk(struct rbd_image_header *header,
490                                  struct rbd_image_header_ondisk *ondisk,
491                                  u32 allocated_snaps,
492                                  gfp_t gfp_flags)
493 {
494         u32 i, snap_count;
495
496         if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
497                 return -ENXIO;
498
499         snap_count = le32_to_cpu(ondisk->snap_count);
500         if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
501                          / sizeof (*ondisk))
502                 return -EINVAL;
503         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
504                                 snap_count * sizeof(u64),
505                                 gfp_flags);
506         if (!header->snapc)
507                 return -ENOMEM;
508
509         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
510         if (snap_count) {
511                 header->snap_names = kmalloc(header->snap_names_len,
512                                              gfp_flags);
513                 if (!header->snap_names)
514                         goto err_snapc;
515                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
516                                              gfp_flags);
517                 if (!header->snap_sizes)
518                         goto err_names;
519         } else {
520                 header->snap_names = NULL;
521                 header->snap_sizes = NULL;
522         }
523
524         header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
525                                         gfp_flags);
526         if (!header->object_prefix)
527                 goto err_sizes;
528
529         memcpy(header->object_prefix, ondisk->block_name,
530                sizeof(ondisk->block_name));
531         header->object_prefix[sizeof (ondisk->block_name)] = '\0';
532
533         header->image_size = le64_to_cpu(ondisk->image_size);
534         header->obj_order = ondisk->options.order;
535         header->crypt_type = ondisk->options.crypt_type;
536         header->comp_type = ondisk->options.comp_type;
537
538         atomic_set(&header->snapc->nref, 1);
539         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
540         header->snapc->num_snaps = snap_count;
541         header->total_snaps = snap_count;
542
543         if (snap_count && allocated_snaps == snap_count) {
544                 for (i = 0; i < snap_count; i++) {
545                         header->snapc->snaps[i] =
546                                 le64_to_cpu(ondisk->snaps[i].id);
547                         header->snap_sizes[i] =
548                                 le64_to_cpu(ondisk->snaps[i].image_size);
549                 }
550
551                 /* copy snapshot names */
552                 memcpy(header->snap_names, &ondisk->snaps[i],
553                         header->snap_names_len);
554         }
555
556         return 0;
557
558 err_sizes:
559         kfree(header->snap_sizes);
560 err_names:
561         kfree(header->snap_names);
562 err_snapc:
563         kfree(header->snapc);
564         return -ENOMEM;
565 }
566
567 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
568                         u64 *seq, u64 *size)
569 {
570         int i;
571         char *p = header->snap_names;
572
573         for (i = 0; i < header->total_snaps; i++) {
574                 if (!strcmp(snap_name, p)) {
575
576                         /* Found it.  Pass back its id and/or size */
577
578                         if (seq)
579                                 *seq = header->snapc->snaps[i];
580                         if (size)
581                                 *size = header->snap_sizes[i];
582                         return i;
583                 }
584                 p += strlen(p) + 1;     /* Skip ahead to the next name */
585         }
586         return -ENOENT;
587 }
588
589 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
590 {
591         struct rbd_image_header *header = &rbd_dev->header;
592         struct ceph_snap_context *snapc = header->snapc;
593         int ret = -ENOENT;
594
595         down_write(&rbd_dev->header_rwsem);
596
597         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
598                     sizeof (RBD_SNAP_HEAD_NAME))) {
599                 if (header->total_snaps)
600                         snapc->seq = header->snap_seq;
601                 else
602                         snapc->seq = 0;
603                 rbd_dev->snap_id = CEPH_NOSNAP;
604                 rbd_dev->snap_exists = false;
605                 rbd_dev->read_only = 0;
606                 if (size)
607                         *size = header->image_size;
608         } else {
609                 ret = snap_by_name(header, rbd_dev->snap_name,
610                                         &snapc->seq, size);
611                 if (ret < 0)
612                         goto done;
613                 rbd_dev->snap_id = snapc->seq;
614                 rbd_dev->snap_exists = true;
615                 rbd_dev->read_only = 1;
616         }
617
618         ret = 0;
619 done:
620         up_write(&rbd_dev->header_rwsem);
621         return ret;
622 }
623
624 static void rbd_header_free(struct rbd_image_header *header)
625 {
626         kfree(header->object_prefix);
627         kfree(header->snap_sizes);
628         kfree(header->snap_names);
629         kfree(header->snapc);
630 }
631
632 /*
633  * get the actual striped segment name, offset and length
634  */
635 static u64 rbd_get_segment(struct rbd_image_header *header,
636                            const char *object_prefix,
637                            u64 ofs, u64 len,
638                            char *seg_name, u64 *segofs)
639 {
640         u64 seg = ofs >> header->obj_order;
641
642         if (seg_name)
643                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
644                          "%s.%012llx", object_prefix, seg);
645
646         ofs = ofs & ((1 << header->obj_order) - 1);
647         len = min_t(u64, len, (1 << header->obj_order) - ofs);
648
649         if (segofs)
650                 *segofs = ofs;
651
652         return len;
653 }
654
655 static int rbd_get_num_segments(struct rbd_image_header *header,
656                                 u64 ofs, u64 len)
657 {
658         u64 start_seg = ofs >> header->obj_order;
659         u64 end_seg = (ofs + len - 1) >> header->obj_order;
660         return end_seg - start_seg + 1;
661 }
662
663 /*
664  * returns the size of an object in the image
665  */
666 static u64 rbd_obj_bytes(struct rbd_image_header *header)
667 {
668         return 1 << header->obj_order;
669 }
670
671 /*
672  * bio helpers
673  */
674
675 static void bio_chain_put(struct bio *chain)
676 {
677         struct bio *tmp;
678
679         while (chain) {
680                 tmp = chain;
681                 chain = chain->bi_next;
682                 bio_put(tmp);
683         }
684 }
685
686 /*
687  * zeros a bio chain, starting at specific offset
688  */
689 static void zero_bio_chain(struct bio *chain, int start_ofs)
690 {
691         struct bio_vec *bv;
692         unsigned long flags;
693         void *buf;
694         int i;
695         int pos = 0;
696
697         while (chain) {
698                 bio_for_each_segment(bv, chain, i) {
699                         if (pos + bv->bv_len > start_ofs) {
700                                 int remainder = max(start_ofs - pos, 0);
701                                 buf = bvec_kmap_irq(bv, &flags);
702                                 memset(buf + remainder, 0,
703                                        bv->bv_len - remainder);
704                                 bvec_kunmap_irq(buf, &flags);
705                         }
706                         pos += bv->bv_len;
707                 }
708
709                 chain = chain->bi_next;
710         }
711 }
712
713 /*
714  * bio_chain_clone - clone a chain of bios up to a certain length.
715  * might return a bio_pair that will need to be released.
716  */
717 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
718                                    struct bio_pair **bp,
719                                    int len, gfp_t gfpmask)
720 {
721         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
722         int total = 0;
723
724         if (*bp) {
725                 bio_pair_release(*bp);
726                 *bp = NULL;
727         }
728
729         while (old_chain && (total < len)) {
730                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
731                 if (!tmp)
732                         goto err_out;
733
734                 if (total + old_chain->bi_size > len) {
735                         struct bio_pair *bp;
736
737                         /*
738                          * this split can only happen with a single paged bio,
739                          * split_bio will BUG_ON if this is not the case
740                          */
741                         dout("bio_chain_clone split! total=%d remaining=%d"
742                              "bi_size=%d\n",
743                              (int)total, (int)len-total,
744                              (int)old_chain->bi_size);
745
746                         /* split the bio. We'll release it either in the next
747                            call, or it will have to be released outside */
748                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
749                         if (!bp)
750                                 goto err_out;
751
752                         __bio_clone(tmp, &bp->bio1);
753
754                         *next = &bp->bio2;
755                 } else {
756                         __bio_clone(tmp, old_chain);
757                         *next = old_chain->bi_next;
758                 }
759
760                 tmp->bi_bdev = NULL;
761                 gfpmask &= ~__GFP_WAIT;
762                 tmp->bi_next = NULL;
763
764                 if (!new_chain) {
765                         new_chain = tail = tmp;
766                 } else {
767                         tail->bi_next = tmp;
768                         tail = tmp;
769                 }
770                 old_chain = old_chain->bi_next;
771
772                 total += tmp->bi_size;
773         }
774
775         BUG_ON(total < len);
776
777         if (tail)
778                 tail->bi_next = NULL;
779
780         *old = old_chain;
781
782         return new_chain;
783
784 err_out:
785         dout("bio_chain_clone with err\n");
786         bio_chain_put(new_chain);
787         return NULL;
788 }
789
790 /*
791  * helpers for osd request op vectors.
792  */
793 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
794                             int num_ops,
795                             int opcode,
796                             u32 payload_len)
797 {
798         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
799                        GFP_NOIO);
800         if (!*ops)
801                 return -ENOMEM;
802         (*ops)[0].op = opcode;
803         /*
804          * op extent offset and length will be set later on
805          * in calc_raw_layout()
806          */
807         (*ops)[0].payload_len = payload_len;
808         return 0;
809 }
810
811 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
812 {
813         kfree(ops);
814 }
815
816 static void rbd_coll_end_req_index(struct request *rq,
817                                    struct rbd_req_coll *coll,
818                                    int index,
819                                    int ret, u64 len)
820 {
821         struct request_queue *q;
822         int min, max, i;
823
824         dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
825              coll, index, ret, len);
826
827         if (!rq)
828                 return;
829
830         if (!coll) {
831                 blk_end_request(rq, ret, len);
832                 return;
833         }
834
835         q = rq->q;
836
837         spin_lock_irq(q->queue_lock);
838         coll->status[index].done = 1;
839         coll->status[index].rc = ret;
840         coll->status[index].bytes = len;
841         max = min = coll->num_done;
842         while (max < coll->total && coll->status[max].done)
843                 max++;
844
845         for (i = min; i<max; i++) {
846                 __blk_end_request(rq, coll->status[i].rc,
847                                   coll->status[i].bytes);
848                 coll->num_done++;
849                 kref_put(&coll->kref, rbd_coll_release);
850         }
851         spin_unlock_irq(q->queue_lock);
852 }
853
854 static void rbd_coll_end_req(struct rbd_request *req,
855                              int ret, u64 len)
856 {
857         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
858 }
859
860 /*
861  * Send ceph osd request
862  */
863 static int rbd_do_request(struct request *rq,
864                           struct rbd_device *rbd_dev,
865                           struct ceph_snap_context *snapc,
866                           u64 snapid,
867                           const char *object_name, u64 ofs, u64 len,
868                           struct bio *bio,
869                           struct page **pages,
870                           int num_pages,
871                           int flags,
872                           struct ceph_osd_req_op *ops,
873                           struct rbd_req_coll *coll,
874                           int coll_index,
875                           void (*rbd_cb)(struct ceph_osd_request *req,
876                                          struct ceph_msg *msg),
877                           struct ceph_osd_request **linger_req,
878                           u64 *ver)
879 {
880         struct ceph_osd_request *req;
881         struct ceph_file_layout *layout;
882         int ret;
883         u64 bno;
884         struct timespec mtime = CURRENT_TIME;
885         struct rbd_request *req_data;
886         struct ceph_osd_request_head *reqhead;
887         struct ceph_osd_client *osdc;
888
889         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
890         if (!req_data) {
891                 if (coll)
892                         rbd_coll_end_req_index(rq, coll, coll_index,
893                                                -ENOMEM, len);
894                 return -ENOMEM;
895         }
896
897         if (coll) {
898                 req_data->coll = coll;
899                 req_data->coll_index = coll_index;
900         }
901
902         dout("rbd_do_request object_name=%s ofs=%lld len=%lld\n",
903                 object_name, len, ofs);
904
905         down_read(&rbd_dev->header_rwsem);
906
907         osdc = &rbd_dev->rbd_client->client->osdc;
908         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
909                                         false, GFP_NOIO, pages, bio);
910         if (!req) {
911                 up_read(&rbd_dev->header_rwsem);
912                 ret = -ENOMEM;
913                 goto done_pages;
914         }
915
916         req->r_callback = rbd_cb;
917
918         req_data->rq = rq;
919         req_data->bio = bio;
920         req_data->pages = pages;
921         req_data->len = len;
922
923         req->r_priv = req_data;
924
925         reqhead = req->r_request->front.iov_base;
926         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
927
928         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
929         req->r_oid_len = strlen(req->r_oid);
930
931         layout = &req->r_file_layout;
932         memset(layout, 0, sizeof(*layout));
933         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
934         layout->fl_stripe_count = cpu_to_le32(1);
935         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
936         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
937         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
938                                 req, ops);
939
940         ceph_osdc_build_request(req, ofs, &len,
941                                 ops,
942                                 snapc,
943                                 &mtime,
944                                 req->r_oid, req->r_oid_len);
945         up_read(&rbd_dev->header_rwsem);
946
947         if (linger_req) {
948                 ceph_osdc_set_request_linger(osdc, req);
949                 *linger_req = req;
950         }
951
952         ret = ceph_osdc_start_request(osdc, req, false);
953         if (ret < 0)
954                 goto done_err;
955
956         if (!rbd_cb) {
957                 ret = ceph_osdc_wait_request(osdc, req);
958                 if (ver)
959                         *ver = le64_to_cpu(req->r_reassert_version.version);
960                 dout("reassert_ver=%lld\n",
961                      le64_to_cpu(req->r_reassert_version.version));
962                 ceph_osdc_put_request(req);
963         }
964         return ret;
965
966 done_err:
967         bio_chain_put(req_data->bio);
968         ceph_osdc_put_request(req);
969 done_pages:
970         rbd_coll_end_req(req_data, ret, len);
971         kfree(req_data);
972         return ret;
973 }
974
975 /*
976  * Ceph osd op callback
977  */
978 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
979 {
980         struct rbd_request *req_data = req->r_priv;
981         struct ceph_osd_reply_head *replyhead;
982         struct ceph_osd_op *op;
983         __s32 rc;
984         u64 bytes;
985         int read_op;
986
987         /* parse reply */
988         replyhead = msg->front.iov_base;
989         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
990         op = (void *)(replyhead + 1);
991         rc = le32_to_cpu(replyhead->result);
992         bytes = le64_to_cpu(op->extent.length);
993         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
994
995         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
996
997         if (rc == -ENOENT && read_op) {
998                 zero_bio_chain(req_data->bio, 0);
999                 rc = 0;
1000         } else if (rc == 0 && read_op && bytes < req_data->len) {
1001                 zero_bio_chain(req_data->bio, bytes);
1002                 bytes = req_data->len;
1003         }
1004
1005         rbd_coll_end_req(req_data, rc, bytes);
1006
1007         if (req_data->bio)
1008                 bio_chain_put(req_data->bio);
1009
1010         ceph_osdc_put_request(req);
1011         kfree(req_data);
1012 }
1013
1014 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1015 {
1016         ceph_osdc_put_request(req);
1017 }
1018
1019 /*
1020  * Do a synchronous ceph osd operation
1021  */
1022 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1023                            struct ceph_snap_context *snapc,
1024                            u64 snapid,
1025                            int opcode,
1026                            int flags,
1027                            struct ceph_osd_req_op *orig_ops,
1028                            const char *object_name,
1029                            u64 ofs, u64 len,
1030                            char *buf,
1031                            struct ceph_osd_request **linger_req,
1032                            u64 *ver)
1033 {
1034         int ret;
1035         struct page **pages;
1036         int num_pages;
1037         struct ceph_osd_req_op *ops = orig_ops;
1038         u32 payload_len;
1039
1040         num_pages = calc_pages_for(ofs , len);
1041         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1042         if (IS_ERR(pages))
1043                 return PTR_ERR(pages);
1044
1045         if (!orig_ops) {
1046                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1047                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1048                 if (ret < 0)
1049                         goto done;
1050
1051                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1052                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1053                         if (ret < 0)
1054                                 goto done_ops;
1055                 }
1056         }
1057
1058         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1059                           object_name, ofs, len, NULL,
1060                           pages, num_pages,
1061                           flags,
1062                           ops,
1063                           NULL, 0,
1064                           NULL,
1065                           linger_req, ver);
1066         if (ret < 0)
1067                 goto done_ops;
1068
1069         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1070                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1071
1072 done_ops:
1073         if (!orig_ops)
1074                 rbd_destroy_ops(ops);
1075 done:
1076         ceph_release_page_vector(pages, num_pages);
1077         return ret;
1078 }
1079
1080 /*
1081  * Do an asynchronous ceph osd operation
1082  */
1083 static int rbd_do_op(struct request *rq,
1084                      struct rbd_device *rbd_dev,
1085                      struct ceph_snap_context *snapc,
1086                      u64 snapid,
1087                      int opcode, int flags,
1088                      u64 ofs, u64 len,
1089                      struct bio *bio,
1090                      struct rbd_req_coll *coll,
1091                      int coll_index)
1092 {
1093         char *seg_name;
1094         u64 seg_ofs;
1095         u64 seg_len;
1096         int ret;
1097         struct ceph_osd_req_op *ops;
1098         u32 payload_len;
1099
1100         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1101         if (!seg_name)
1102                 return -ENOMEM;
1103
1104         seg_len = rbd_get_segment(&rbd_dev->header,
1105                                   rbd_dev->header.object_prefix,
1106                                   ofs, len,
1107                                   seg_name, &seg_ofs);
1108
1109         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1110
1111         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1112         if (ret < 0)
1113                 goto done;
1114
1115         /* we've taken care of segment sizes earlier when we
1116            cloned the bios. We should never have a segment
1117            truncated at this point */
1118         BUG_ON(seg_len < len);
1119
1120         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1121                              seg_name, seg_ofs, seg_len,
1122                              bio,
1123                              NULL, 0,
1124                              flags,
1125                              ops,
1126                              coll, coll_index,
1127                              rbd_req_cb, 0, NULL);
1128
1129         rbd_destroy_ops(ops);
1130 done:
1131         kfree(seg_name);
1132         return ret;
1133 }
1134
1135 /*
1136  * Request async osd write
1137  */
1138 static int rbd_req_write(struct request *rq,
1139                          struct rbd_device *rbd_dev,
1140                          struct ceph_snap_context *snapc,
1141                          u64 ofs, u64 len,
1142                          struct bio *bio,
1143                          struct rbd_req_coll *coll,
1144                          int coll_index)
1145 {
1146         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1147                          CEPH_OSD_OP_WRITE,
1148                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1149                          ofs, len, bio, coll, coll_index);
1150 }
1151
1152 /*
1153  * Request async osd read
1154  */
1155 static int rbd_req_read(struct request *rq,
1156                          struct rbd_device *rbd_dev,
1157                          u64 snapid,
1158                          u64 ofs, u64 len,
1159                          struct bio *bio,
1160                          struct rbd_req_coll *coll,
1161                          int coll_index)
1162 {
1163         return rbd_do_op(rq, rbd_dev, NULL,
1164                          snapid,
1165                          CEPH_OSD_OP_READ,
1166                          CEPH_OSD_FLAG_READ,
1167                          ofs, len, bio, coll, coll_index);
1168 }
1169
1170 /*
1171  * Request sync osd read
1172  */
1173 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1174                           struct ceph_snap_context *snapc,
1175                           u64 snapid,
1176                           const char *object_name,
1177                           u64 ofs, u64 len,
1178                           char *buf,
1179                           u64 *ver)
1180 {
1181         return rbd_req_sync_op(rbd_dev, NULL,
1182                                snapid,
1183                                CEPH_OSD_OP_READ,
1184                                CEPH_OSD_FLAG_READ,
1185                                NULL,
1186                                object_name, ofs, len, buf, NULL, ver);
1187 }
1188
1189 /*
1190  * Request sync osd watch
1191  */
1192 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1193                                    u64 ver,
1194                                    u64 notify_id,
1195                                    const char *object_name)
1196 {
1197         struct ceph_osd_req_op *ops;
1198         int ret;
1199
1200         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1201         if (ret < 0)
1202                 return ret;
1203
1204         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1205         ops[0].watch.cookie = notify_id;
1206         ops[0].watch.flag = 0;
1207
1208         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1209                           object_name, 0, 0, NULL,
1210                           NULL, 0,
1211                           CEPH_OSD_FLAG_READ,
1212                           ops,
1213                           NULL, 0,
1214                           rbd_simple_req_cb, 0, NULL);
1215
1216         rbd_destroy_ops(ops);
1217         return ret;
1218 }
1219
1220 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1221 {
1222         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1223         int rc;
1224
1225         if (!rbd_dev)
1226                 return;
1227
1228         dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n",
1229                 rbd_dev->header_name, notify_id, (int) opcode);
1230         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1231         rc = __rbd_refresh_header(rbd_dev);
1232         mutex_unlock(&ctl_mutex);
1233         if (rc)
1234                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1235                            " update snaps: %d\n", rbd_dev->major, rc);
1236
1237         rbd_req_sync_notify_ack(rbd_dev, ver, notify_id, rbd_dev->header_name);
1238 }
1239
1240 /*
1241  * Request sync osd watch
1242  */
1243 static int rbd_req_sync_watch(struct rbd_device *rbd_dev,
1244                               const char *object_name,
1245                               u64 ver)
1246 {
1247         struct ceph_osd_req_op *ops;
1248         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1249
1250         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1251         if (ret < 0)
1252                 return ret;
1253
1254         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1255                                      (void *)rbd_dev, &rbd_dev->watch_event);
1256         if (ret < 0)
1257                 goto fail;
1258
1259         ops[0].watch.ver = cpu_to_le64(ver);
1260         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1261         ops[0].watch.flag = 1;
1262
1263         ret = rbd_req_sync_op(rbd_dev, NULL,
1264                               CEPH_NOSNAP,
1265                               0,
1266                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1267                               ops,
1268                               object_name, 0, 0, NULL,
1269                               &rbd_dev->watch_request, NULL);
1270
1271         if (ret < 0)
1272                 goto fail_event;
1273
1274         rbd_destroy_ops(ops);
1275         return 0;
1276
1277 fail_event:
1278         ceph_osdc_cancel_event(rbd_dev->watch_event);
1279         rbd_dev->watch_event = NULL;
1280 fail:
1281         rbd_destroy_ops(ops);
1282         return ret;
1283 }
1284
1285 /*
1286  * Request sync osd unwatch
1287  */
1288 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev,
1289                                 const char *object_name)
1290 {
1291         struct ceph_osd_req_op *ops;
1292
1293         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1294         if (ret < 0)
1295                 return ret;
1296
1297         ops[0].watch.ver = 0;
1298         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1299         ops[0].watch.flag = 0;
1300
1301         ret = rbd_req_sync_op(rbd_dev, NULL,
1302                               CEPH_NOSNAP,
1303                               0,
1304                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1305                               ops,
1306                               object_name, 0, 0, NULL, NULL, NULL);
1307
1308         rbd_destroy_ops(ops);
1309         ceph_osdc_cancel_event(rbd_dev->watch_event);
1310         rbd_dev->watch_event = NULL;
1311         return ret;
1312 }
1313
1314 struct rbd_notify_info {
1315         struct rbd_device *rbd_dev;
1316 };
1317
1318 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1319 {
1320         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1321         if (!rbd_dev)
1322                 return;
1323
1324         dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n",
1325                                 rbd_dev->header_name,
1326                 notify_id, (int)opcode);
1327 }
1328
1329 /*
1330  * Request sync osd notify
1331  */
1332 static int rbd_req_sync_notify(struct rbd_device *rbd_dev,
1333                           const char *object_name)
1334 {
1335         struct ceph_osd_req_op *ops;
1336         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1337         struct ceph_osd_event *event;
1338         struct rbd_notify_info info;
1339         int payload_len = sizeof(u32) + sizeof(u32);
1340         int ret;
1341
1342         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1343         if (ret < 0)
1344                 return ret;
1345
1346         info.rbd_dev = rbd_dev;
1347
1348         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1349                                      (void *)&info, &event);
1350         if (ret < 0)
1351                 goto fail;
1352
1353         ops[0].watch.ver = 1;
1354         ops[0].watch.flag = 1;
1355         ops[0].watch.cookie = event->cookie;
1356         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1357         ops[0].watch.timeout = 12;
1358
1359         ret = rbd_req_sync_op(rbd_dev, NULL,
1360                                CEPH_NOSNAP,
1361                                0,
1362                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1363                                ops,
1364                                object_name, 0, 0, NULL, NULL, NULL);
1365         if (ret < 0)
1366                 goto fail_event;
1367
1368         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1369         dout("ceph_osdc_wait_event returned %d\n", ret);
1370         rbd_destroy_ops(ops);
1371         return 0;
1372
1373 fail_event:
1374         ceph_osdc_cancel_event(event);
1375 fail:
1376         rbd_destroy_ops(ops);
1377         return ret;
1378 }
1379
1380 /*
1381  * Request sync osd read
1382  */
1383 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1384                              const char *object_name,
1385                              const char *class_name,
1386                              const char *method_name,
1387                              const char *data,
1388                              int len,
1389                              u64 *ver)
1390 {
1391         struct ceph_osd_req_op *ops;
1392         int class_name_len = strlen(class_name);
1393         int method_name_len = strlen(method_name);
1394         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1395                                     class_name_len + method_name_len + len);
1396         if (ret < 0)
1397                 return ret;
1398
1399         ops[0].cls.class_name = class_name;
1400         ops[0].cls.class_len = (__u8) class_name_len;
1401         ops[0].cls.method_name = method_name;
1402         ops[0].cls.method_len = (__u8) method_name_len;
1403         ops[0].cls.argc = 0;
1404         ops[0].cls.indata = data;
1405         ops[0].cls.indata_len = len;
1406
1407         ret = rbd_req_sync_op(rbd_dev, NULL,
1408                                CEPH_NOSNAP,
1409                                0,
1410                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1411                                ops,
1412                                object_name, 0, 0, NULL, NULL, ver);
1413
1414         rbd_destroy_ops(ops);
1415
1416         dout("cls_exec returned %d\n", ret);
1417         return ret;
1418 }
1419
1420 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1421 {
1422         struct rbd_req_coll *coll =
1423                         kzalloc(sizeof(struct rbd_req_coll) +
1424                                 sizeof(struct rbd_req_status) * num_reqs,
1425                                 GFP_ATOMIC);
1426
1427         if (!coll)
1428                 return NULL;
1429         coll->total = num_reqs;
1430         kref_init(&coll->kref);
1431         return coll;
1432 }
1433
1434 /*
1435  * block device queue callback
1436  */
1437 static void rbd_rq_fn(struct request_queue *q)
1438 {
1439         struct rbd_device *rbd_dev = q->queuedata;
1440         struct request *rq;
1441         struct bio_pair *bp = NULL;
1442
1443         while ((rq = blk_fetch_request(q))) {
1444                 struct bio *bio;
1445                 struct bio *rq_bio, *next_bio = NULL;
1446                 bool do_write;
1447                 int size, op_size = 0;
1448                 u64 ofs;
1449                 int num_segs, cur_seg = 0;
1450                 struct rbd_req_coll *coll;
1451
1452                 /* peek at request from block layer */
1453                 if (!rq)
1454                         break;
1455
1456                 dout("fetched request\n");
1457
1458                 /* filter out block requests we don't understand */
1459                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1460                         __blk_end_request_all(rq, 0);
1461                         continue;
1462                 }
1463
1464                 /* deduce our operation (read, write) */
1465                 do_write = (rq_data_dir(rq) == WRITE);
1466
1467                 size = blk_rq_bytes(rq);
1468                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1469                 rq_bio = rq->bio;
1470                 if (do_write && rbd_dev->read_only) {
1471                         __blk_end_request_all(rq, -EROFS);
1472                         continue;
1473                 }
1474
1475                 spin_unlock_irq(q->queue_lock);
1476
1477                 if (rbd_dev->snap_id != CEPH_NOSNAP) {
1478                         bool snap_exists;
1479
1480                         down_read(&rbd_dev->header_rwsem);
1481                         snap_exists = rbd_dev->snap_exists;
1482                         up_read(&rbd_dev->header_rwsem);
1483
1484                         if (!snap_exists) {
1485                                 dout("request for non-existent snapshot");
1486                                 spin_lock_irq(q->queue_lock);
1487                                 __blk_end_request_all(rq, -ENXIO);
1488                                 continue;
1489                         }
1490                 }
1491
1492                 dout("%s 0x%x bytes at 0x%llx\n",
1493                      do_write ? "write" : "read",
1494                      size, blk_rq_pos(rq) * SECTOR_SIZE);
1495
1496                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1497                 coll = rbd_alloc_coll(num_segs);
1498                 if (!coll) {
1499                         spin_lock_irq(q->queue_lock);
1500                         __blk_end_request_all(rq, -ENOMEM);
1501                         continue;
1502                 }
1503
1504                 do {
1505                         /* a bio clone to be passed down to OSD req */
1506                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1507                         op_size = rbd_get_segment(&rbd_dev->header,
1508                                                   rbd_dev->header.object_prefix,
1509                                                   ofs, size,
1510                                                   NULL, NULL);
1511                         kref_get(&coll->kref);
1512                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1513                                               op_size, GFP_ATOMIC);
1514                         if (!bio) {
1515                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1516                                                        -ENOMEM, op_size);
1517                                 goto next_seg;
1518                         }
1519
1520
1521                         /* init OSD command: write or read */
1522                         if (do_write)
1523                                 rbd_req_write(rq, rbd_dev,
1524                                               rbd_dev->header.snapc,
1525                                               ofs,
1526                                               op_size, bio,
1527                                               coll, cur_seg);
1528                         else
1529                                 rbd_req_read(rq, rbd_dev,
1530                                              rbd_dev->snap_id,
1531                                              ofs,
1532                                              op_size, bio,
1533                                              coll, cur_seg);
1534
1535 next_seg:
1536                         size -= op_size;
1537                         ofs += op_size;
1538
1539                         cur_seg++;
1540                         rq_bio = next_bio;
1541                 } while (size > 0);
1542                 kref_put(&coll->kref, rbd_coll_release);
1543
1544                 if (bp)
1545                         bio_pair_release(bp);
1546                 spin_lock_irq(q->queue_lock);
1547         }
1548 }
1549
1550 /*
1551  * a queue callback. Makes sure that we don't create a bio that spans across
1552  * multiple osd objects. One exception would be with a single page bios,
1553  * which we handle later at bio_chain_clone
1554  */
1555 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1556                           struct bio_vec *bvec)
1557 {
1558         struct rbd_device *rbd_dev = q->queuedata;
1559         unsigned int chunk_sectors;
1560         sector_t sector;
1561         unsigned int bio_sectors;
1562         int max;
1563
1564         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1565         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1566         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1567
1568         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1569                                  + bio_sectors)) << SECTOR_SHIFT;
1570         if (max < 0)
1571                 max = 0; /* bio_add cannot handle a negative return */
1572         if (max <= bvec->bv_len && bio_sectors == 0)
1573                 return bvec->bv_len;
1574         return max;
1575 }
1576
1577 static void rbd_free_disk(struct rbd_device *rbd_dev)
1578 {
1579         struct gendisk *disk = rbd_dev->disk;
1580
1581         if (!disk)
1582                 return;
1583
1584         rbd_header_free(&rbd_dev->header);
1585
1586         if (disk->flags & GENHD_FL_UP)
1587                 del_gendisk(disk);
1588         if (disk->queue)
1589                 blk_cleanup_queue(disk->queue);
1590         put_disk(disk);
1591 }
1592
1593 /*
1594  * reload the ondisk the header 
1595  */
1596 static int rbd_read_header(struct rbd_device *rbd_dev,
1597                            struct rbd_image_header *header)
1598 {
1599         ssize_t rc;
1600         struct rbd_image_header_ondisk *dh;
1601         u32 snap_count = 0;
1602         u64 ver;
1603         size_t len;
1604
1605         /*
1606          * First reads the fixed-size header to determine the number
1607          * of snapshots, then re-reads it, along with all snapshot
1608          * records as well as their stored names.
1609          */
1610         len = sizeof (*dh);
1611         while (1) {
1612                 dh = kmalloc(len, GFP_KERNEL);
1613                 if (!dh)
1614                         return -ENOMEM;
1615
1616                 rc = rbd_req_sync_read(rbd_dev,
1617                                        NULL, CEPH_NOSNAP,
1618                                        rbd_dev->header_name,
1619                                        0, len,
1620                                        (char *)dh, &ver);
1621                 if (rc < 0)
1622                         goto out_dh;
1623
1624                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1625                 if (rc < 0) {
1626                         if (rc == -ENXIO)
1627                                 pr_warning("unrecognized header format"
1628                                            " for image %s\n",
1629                                            rbd_dev->image_name);
1630                         goto out_dh;
1631                 }
1632
1633                 if (snap_count == header->total_snaps)
1634                         break;
1635
1636                 snap_count = header->total_snaps;
1637                 len = sizeof (*dh) +
1638                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1639                         header->snap_names_len;
1640
1641                 rbd_header_free(header);
1642                 kfree(dh);
1643         }
1644         header->obj_version = ver;
1645
1646 out_dh:
1647         kfree(dh);
1648         return rc;
1649 }
1650
1651 /*
1652  * create a snapshot
1653  */
1654 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1655                                const char *snap_name,
1656                                gfp_t gfp_flags)
1657 {
1658         int name_len = strlen(snap_name);
1659         u64 new_snapid;
1660         int ret;
1661         void *data, *p, *e;
1662         u64 ver;
1663         struct ceph_mon_client *monc;
1664
1665         /* we should create a snapshot only if we're pointing at the head */
1666         if (rbd_dev->snap_id != CEPH_NOSNAP)
1667                 return -EINVAL;
1668
1669         monc = &rbd_dev->rbd_client->client->monc;
1670         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1671         dout("created snapid=%lld\n", new_snapid);
1672         if (ret < 0)
1673                 return ret;
1674
1675         data = kmalloc(name_len + 16, gfp_flags);
1676         if (!data)
1677                 return -ENOMEM;
1678
1679         p = data;
1680         e = data + name_len + 16;
1681
1682         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1683         ceph_encode_64_safe(&p, e, new_snapid, bad);
1684
1685         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1686                                 "rbd", "snap_add",
1687                                 data, p - data, &ver);
1688
1689         kfree(data);
1690
1691         if (ret < 0)
1692                 return ret;
1693
1694         down_write(&rbd_dev->header_rwsem);
1695         rbd_dev->header.snapc->seq = new_snapid;
1696         up_write(&rbd_dev->header_rwsem);
1697
1698         return 0;
1699 bad:
1700         return -ERANGE;
1701 }
1702
1703 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1704 {
1705         struct rbd_snap *snap;
1706
1707         while (!list_empty(&rbd_dev->snaps)) {
1708                 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1709                 __rbd_remove_snap_dev(rbd_dev, snap);
1710         }
1711 }
1712
1713 /*
1714  * only read the first part of the ondisk header, without the snaps info
1715  */
1716 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1717 {
1718         int ret;
1719         struct rbd_image_header h;
1720         u64 snap_seq;
1721         int follow_seq = 0;
1722
1723         ret = rbd_read_header(rbd_dev, &h);
1724         if (ret < 0)
1725                 return ret;
1726
1727         down_write(&rbd_dev->header_rwsem);
1728
1729         /* resized? */
1730         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1731                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1732
1733                 dout("setting size to %llu sectors", (unsigned long long) size);
1734                 set_capacity(rbd_dev->disk, size);
1735         }
1736
1737         snap_seq = rbd_dev->header.snapc->seq;
1738         if (rbd_dev->header.total_snaps &&
1739             rbd_dev->header.snapc->snaps[0] == snap_seq)
1740                 /* pointing at the head, will need to follow that
1741                    if head moves */
1742                 follow_seq = 1;
1743
1744         /* rbd_dev->header.object_prefix shouldn't change */
1745         kfree(rbd_dev->header.snap_sizes);
1746         kfree(rbd_dev->header.snap_names);
1747         kfree(rbd_dev->header.snapc);
1748
1749         rbd_dev->header.image_size = h.image_size;
1750         rbd_dev->header.total_snaps = h.total_snaps;
1751         rbd_dev->header.snapc = h.snapc;
1752         rbd_dev->header.snap_names = h.snap_names;
1753         rbd_dev->header.snap_names_len = h.snap_names_len;
1754         rbd_dev->header.snap_sizes = h.snap_sizes;
1755         /* Free the extra copy of the object prefix */
1756         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1757         kfree(h.object_prefix);
1758
1759         if (follow_seq)
1760                 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1761         else
1762                 rbd_dev->header.snapc->seq = snap_seq;
1763
1764         ret = __rbd_init_snaps_header(rbd_dev);
1765
1766         up_write(&rbd_dev->header_rwsem);
1767
1768         return ret;
1769 }
1770
1771 static int rbd_init_disk(struct rbd_device *rbd_dev)
1772 {
1773         struct gendisk *disk;
1774         struct request_queue *q;
1775         int rc;
1776         u64 segment_size;
1777         u64 total_size = 0;
1778
1779         /* contact OSD, request size info about the object being mapped */
1780         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1781         if (rc)
1782                 return rc;
1783
1784         /* no need to lock here, as rbd_dev is not registered yet */
1785         rc = __rbd_init_snaps_header(rbd_dev);
1786         if (rc)
1787                 return rc;
1788
1789         rc = rbd_header_set_snap(rbd_dev, &total_size);
1790         if (rc)
1791                 return rc;
1792
1793         /* create gendisk info */
1794         rc = -ENOMEM;
1795         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1796         if (!disk)
1797                 goto out;
1798
1799         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1800                  rbd_dev->id);
1801         disk->major = rbd_dev->major;
1802         disk->first_minor = 0;
1803         disk->fops = &rbd_bd_ops;
1804         disk->private_data = rbd_dev;
1805
1806         /* init rq */
1807         rc = -ENOMEM;
1808         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1809         if (!q)
1810                 goto out_disk;
1811
1812         /* We use the default size, but let's be explicit about it. */
1813         blk_queue_physical_block_size(q, SECTOR_SIZE);
1814
1815         /* set io sizes to object size */
1816         segment_size = rbd_obj_bytes(&rbd_dev->header);
1817         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1818         blk_queue_max_segment_size(q, segment_size);
1819         blk_queue_io_min(q, segment_size);
1820         blk_queue_io_opt(q, segment_size);
1821
1822         blk_queue_merge_bvec(q, rbd_merge_bvec);
1823         disk->queue = q;
1824
1825         q->queuedata = rbd_dev;
1826
1827         rbd_dev->disk = disk;
1828         rbd_dev->q = q;
1829
1830         /* finally, announce the disk to the world */
1831         set_capacity(disk, total_size / SECTOR_SIZE);
1832         add_disk(disk);
1833
1834         pr_info("%s: added with size 0x%llx\n",
1835                 disk->disk_name, (unsigned long long)total_size);
1836         return 0;
1837
1838 out_disk:
1839         put_disk(disk);
1840 out:
1841         return rc;
1842 }
1843
1844 /*
1845   sysfs
1846 */
1847
1848 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1849 {
1850         return container_of(dev, struct rbd_device, dev);
1851 }
1852
1853 static ssize_t rbd_size_show(struct device *dev,
1854                              struct device_attribute *attr, char *buf)
1855 {
1856         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1857         sector_t size;
1858
1859         down_read(&rbd_dev->header_rwsem);
1860         size = get_capacity(rbd_dev->disk);
1861         up_read(&rbd_dev->header_rwsem);
1862
1863         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1864 }
1865
1866 static ssize_t rbd_major_show(struct device *dev,
1867                               struct device_attribute *attr, char *buf)
1868 {
1869         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1870
1871         return sprintf(buf, "%d\n", rbd_dev->major);
1872 }
1873
1874 static ssize_t rbd_client_id_show(struct device *dev,
1875                                   struct device_attribute *attr, char *buf)
1876 {
1877         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1878
1879         return sprintf(buf, "client%lld\n",
1880                         ceph_client_id(rbd_dev->rbd_client->client));
1881 }
1882
1883 static ssize_t rbd_pool_show(struct device *dev,
1884                              struct device_attribute *attr, char *buf)
1885 {
1886         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1887
1888         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1889 }
1890
1891 static ssize_t rbd_pool_id_show(struct device *dev,
1892                              struct device_attribute *attr, char *buf)
1893 {
1894         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1895
1896         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1897 }
1898
1899 static ssize_t rbd_name_show(struct device *dev,
1900                              struct device_attribute *attr, char *buf)
1901 {
1902         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1903
1904         return sprintf(buf, "%s\n", rbd_dev->image_name);
1905 }
1906
1907 static ssize_t rbd_snap_show(struct device *dev,
1908                              struct device_attribute *attr,
1909                              char *buf)
1910 {
1911         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1912
1913         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1914 }
1915
1916 static ssize_t rbd_image_refresh(struct device *dev,
1917                                  struct device_attribute *attr,
1918                                  const char *buf,
1919                                  size_t size)
1920 {
1921         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1922         int rc;
1923         int ret = size;
1924
1925         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1926
1927         rc = __rbd_refresh_header(rbd_dev);
1928         if (rc < 0)
1929                 ret = rc;
1930
1931         mutex_unlock(&ctl_mutex);
1932         return ret;
1933 }
1934
1935 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1936 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1937 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1938 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1939 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1940 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1941 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1942 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1943 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1944
1945 static struct attribute *rbd_attrs[] = {
1946         &dev_attr_size.attr,
1947         &dev_attr_major.attr,
1948         &dev_attr_client_id.attr,
1949         &dev_attr_pool.attr,
1950         &dev_attr_pool_id.attr,
1951         &dev_attr_name.attr,
1952         &dev_attr_current_snap.attr,
1953         &dev_attr_refresh.attr,
1954         &dev_attr_create_snap.attr,
1955         NULL
1956 };
1957
1958 static struct attribute_group rbd_attr_group = {
1959         .attrs = rbd_attrs,
1960 };
1961
1962 static const struct attribute_group *rbd_attr_groups[] = {
1963         &rbd_attr_group,
1964         NULL
1965 };
1966
1967 static void rbd_sysfs_dev_release(struct device *dev)
1968 {
1969 }
1970
1971 static struct device_type rbd_device_type = {
1972         .name           = "rbd",
1973         .groups         = rbd_attr_groups,
1974         .release        = rbd_sysfs_dev_release,
1975 };
1976
1977
1978 /*
1979   sysfs - snapshots
1980 */
1981
1982 static ssize_t rbd_snap_size_show(struct device *dev,
1983                                   struct device_attribute *attr,
1984                                   char *buf)
1985 {
1986         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1987
1988         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1989 }
1990
1991 static ssize_t rbd_snap_id_show(struct device *dev,
1992                                 struct device_attribute *attr,
1993                                 char *buf)
1994 {
1995         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1996
1997         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1998 }
1999
2000 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2001 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2002
2003 static struct attribute *rbd_snap_attrs[] = {
2004         &dev_attr_snap_size.attr,
2005         &dev_attr_snap_id.attr,
2006         NULL,
2007 };
2008
2009 static struct attribute_group rbd_snap_attr_group = {
2010         .attrs = rbd_snap_attrs,
2011 };
2012
2013 static void rbd_snap_dev_release(struct device *dev)
2014 {
2015         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2016         kfree(snap->name);
2017         kfree(snap);
2018 }
2019
2020 static const struct attribute_group *rbd_snap_attr_groups[] = {
2021         &rbd_snap_attr_group,
2022         NULL
2023 };
2024
2025 static struct device_type rbd_snap_device_type = {
2026         .groups         = rbd_snap_attr_groups,
2027         .release        = rbd_snap_dev_release,
2028 };
2029
2030 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
2031                                   struct rbd_snap *snap)
2032 {
2033         list_del(&snap->node);
2034         device_unregister(&snap->dev);
2035 }
2036
2037 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2038                                   struct rbd_snap *snap,
2039                                   struct device *parent)
2040 {
2041         struct device *dev = &snap->dev;
2042         int ret;
2043
2044         dev->type = &rbd_snap_device_type;
2045         dev->parent = parent;
2046         dev->release = rbd_snap_dev_release;
2047         dev_set_name(dev, "snap_%s", snap->name);
2048         ret = device_register(dev);
2049
2050         return ret;
2051 }
2052
2053 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2054                               int i, const char *name,
2055                               struct rbd_snap **snapp)
2056 {
2057         int ret;
2058         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2059         if (!snap)
2060                 return -ENOMEM;
2061         snap->name = kstrdup(name, GFP_KERNEL);
2062         snap->size = rbd_dev->header.snap_sizes[i];
2063         snap->id = rbd_dev->header.snapc->snaps[i];
2064         if (device_is_registered(&rbd_dev->dev)) {
2065                 ret = rbd_register_snap_dev(rbd_dev, snap,
2066                                              &rbd_dev->dev);
2067                 if (ret < 0)
2068                         goto err;
2069         }
2070         *snapp = snap;
2071         return 0;
2072 err:
2073         kfree(snap->name);
2074         kfree(snap);
2075         return ret;
2076 }
2077
2078 /*
2079  * search for the previous snap in a null delimited string list
2080  */
2081 const char *rbd_prev_snap_name(const char *name, const char *start)
2082 {
2083         if (name < start + 2)
2084                 return NULL;
2085
2086         name -= 2;
2087         while (*name) {
2088                 if (name == start)
2089                         return start;
2090                 name--;
2091         }
2092         return name + 1;
2093 }
2094
2095 /*
2096  * compare the old list of snapshots that we have to what's in the header
2097  * and update it accordingly. Note that the header holds the snapshots
2098  * in a reverse order (from newest to oldest) and we need to go from
2099  * older to new so that we don't get a duplicate snap name when
2100  * doing the process (e.g., removed snapshot and recreated a new
2101  * one with the same name.
2102  */
2103 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2104 {
2105         const char *name, *first_name;
2106         int i = rbd_dev->header.total_snaps;
2107         struct rbd_snap *snap, *old_snap = NULL;
2108         int ret;
2109         struct list_head *p, *n;
2110
2111         first_name = rbd_dev->header.snap_names;
2112         name = first_name + rbd_dev->header.snap_names_len;
2113
2114         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2115                 u64 cur_id;
2116
2117                 old_snap = list_entry(p, struct rbd_snap, node);
2118
2119                 if (i)
2120                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2121
2122                 if (!i || old_snap->id < cur_id) {
2123                         /*
2124                          * old_snap->id was skipped, thus was
2125                          * removed.  If this rbd_dev is mapped to
2126                          * the removed snapshot, record that it no
2127                          * longer exists, to prevent further I/O.
2128                          */
2129                         if (rbd_dev->snap_id == old_snap->id)
2130                                 rbd_dev->snap_exists = false;
2131                         __rbd_remove_snap_dev(rbd_dev, old_snap);
2132                         continue;
2133                 }
2134                 if (old_snap->id == cur_id) {
2135                         /* we have this snapshot already */
2136                         i--;
2137                         name = rbd_prev_snap_name(name, first_name);
2138                         continue;
2139                 }
2140                 for (; i > 0;
2141                      i--, name = rbd_prev_snap_name(name, first_name)) {
2142                         if (!name) {
2143                                 WARN_ON(1);
2144                                 return -EINVAL;
2145                         }
2146                         cur_id = rbd_dev->header.snapc->snaps[i];
2147                         /* snapshot removal? handle it above */
2148                         if (cur_id >= old_snap->id)
2149                                 break;
2150                         /* a new snapshot */
2151                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2152                         if (ret < 0)
2153                                 return ret;
2154
2155                         /* note that we add it backward so using n and not p */
2156                         list_add(&snap->node, n);
2157                         p = &snap->node;
2158                 }
2159         }
2160         /* we're done going over the old snap list, just add what's left */
2161         for (; i > 0; i--) {
2162                 name = rbd_prev_snap_name(name, first_name);
2163                 if (!name) {
2164                         WARN_ON(1);
2165                         return -EINVAL;
2166                 }
2167                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2168                 if (ret < 0)
2169                         return ret;
2170                 list_add(&snap->node, &rbd_dev->snaps);
2171         }
2172
2173         return 0;
2174 }
2175
2176 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2177 {
2178         int ret;
2179         struct device *dev;
2180         struct rbd_snap *snap;
2181
2182         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2183         dev = &rbd_dev->dev;
2184
2185         dev->bus = &rbd_bus_type;
2186         dev->type = &rbd_device_type;
2187         dev->parent = &rbd_root_dev;
2188         dev->release = rbd_dev_release;
2189         dev_set_name(dev, "%d", rbd_dev->id);
2190         ret = device_register(dev);
2191         if (ret < 0)
2192                 goto out;
2193
2194         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2195                 ret = rbd_register_snap_dev(rbd_dev, snap,
2196                                              &rbd_dev->dev);
2197                 if (ret < 0)
2198                         break;
2199         }
2200 out:
2201         mutex_unlock(&ctl_mutex);
2202         return ret;
2203 }
2204
2205 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2206 {
2207         device_unregister(&rbd_dev->dev);
2208 }
2209
2210 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2211 {
2212         int ret, rc;
2213
2214         do {
2215                 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->header_name,
2216                                          rbd_dev->header.obj_version);
2217                 if (ret == -ERANGE) {
2218                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2219                         rc = __rbd_refresh_header(rbd_dev);
2220                         mutex_unlock(&ctl_mutex);
2221                         if (rc < 0)
2222                                 return rc;
2223                 }
2224         } while (ret == -ERANGE);
2225
2226         return ret;
2227 }
2228
2229 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2230
2231 /*
2232  * Get a unique rbd identifier for the given new rbd_dev, and add
2233  * the rbd_dev to the global list.  The minimum rbd id is 1.
2234  */
2235 static void rbd_id_get(struct rbd_device *rbd_dev)
2236 {
2237         rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2238
2239         spin_lock(&rbd_dev_list_lock);
2240         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2241         spin_unlock(&rbd_dev_list_lock);
2242 }
2243
2244 /*
2245  * Remove an rbd_dev from the global list, and record that its
2246  * identifier is no longer in use.
2247  */
2248 static void rbd_id_put(struct rbd_device *rbd_dev)
2249 {
2250         struct list_head *tmp;
2251         int rbd_id = rbd_dev->id;
2252         int max_id;
2253
2254         BUG_ON(rbd_id < 1);
2255
2256         spin_lock(&rbd_dev_list_lock);
2257         list_del_init(&rbd_dev->node);
2258
2259         /*
2260          * If the id being "put" is not the current maximum, there
2261          * is nothing special we need to do.
2262          */
2263         if (rbd_id != atomic64_read(&rbd_id_max)) {
2264                 spin_unlock(&rbd_dev_list_lock);
2265                 return;
2266         }
2267
2268         /*
2269          * We need to update the current maximum id.  Search the
2270          * list to find out what it is.  We're more likely to find
2271          * the maximum at the end, so search the list backward.
2272          */
2273         max_id = 0;
2274         list_for_each_prev(tmp, &rbd_dev_list) {
2275                 struct rbd_device *rbd_dev;
2276
2277                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2278                 if (rbd_id > max_id)
2279                         max_id = rbd_id;
2280         }
2281         spin_unlock(&rbd_dev_list_lock);
2282
2283         /*
2284          * The max id could have been updated by rbd_id_get(), in
2285          * which case it now accurately reflects the new maximum.
2286          * Be careful not to overwrite the maximum value in that
2287          * case.
2288          */
2289         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2290 }
2291
2292 /*
2293  * Skips over white space at *buf, and updates *buf to point to the
2294  * first found non-space character (if any). Returns the length of
2295  * the token (string of non-white space characters) found.  Note
2296  * that *buf must be terminated with '\0'.
2297  */
2298 static inline size_t next_token(const char **buf)
2299 {
2300         /*
2301         * These are the characters that produce nonzero for
2302         * isspace() in the "C" and "POSIX" locales.
2303         */
2304         const char *spaces = " \f\n\r\t\v";
2305
2306         *buf += strspn(*buf, spaces);   /* Find start of token */
2307
2308         return strcspn(*buf, spaces);   /* Return token length */
2309 }
2310
2311 /*
2312  * Finds the next token in *buf, and if the provided token buffer is
2313  * big enough, copies the found token into it.  The result, if
2314  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2315  * must be terminated with '\0' on entry.
2316  *
2317  * Returns the length of the token found (not including the '\0').
2318  * Return value will be 0 if no token is found, and it will be >=
2319  * token_size if the token would not fit.
2320  *
2321  * The *buf pointer will be updated to point beyond the end of the
2322  * found token.  Note that this occurs even if the token buffer is
2323  * too small to hold it.
2324  */
2325 static inline size_t copy_token(const char **buf,
2326                                 char *token,
2327                                 size_t token_size)
2328 {
2329         size_t len;
2330
2331         len = next_token(buf);
2332         if (len < token_size) {
2333                 memcpy(token, *buf, len);
2334                 *(token + len) = '\0';
2335         }
2336         *buf += len;
2337
2338         return len;
2339 }
2340
2341 /*
2342  * Finds the next token in *buf, dynamically allocates a buffer big
2343  * enough to hold a copy of it, and copies the token into the new
2344  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2345  * that a duplicate buffer is created even for a zero-length token.
2346  *
2347  * Returns a pointer to the newly-allocated duplicate, or a null
2348  * pointer if memory for the duplicate was not available.  If
2349  * the lenp argument is a non-null pointer, the length of the token
2350  * (not including the '\0') is returned in *lenp.
2351  *
2352  * If successful, the *buf pointer will be updated to point beyond
2353  * the end of the found token.
2354  *
2355  * Note: uses GFP_KERNEL for allocation.
2356  */
2357 static inline char *dup_token(const char **buf, size_t *lenp)
2358 {
2359         char *dup;
2360         size_t len;
2361
2362         len = next_token(buf);
2363         dup = kmalloc(len + 1, GFP_KERNEL);
2364         if (!dup)
2365                 return NULL;
2366
2367         memcpy(dup, *buf, len);
2368         *(dup + len) = '\0';
2369         *buf += len;
2370
2371         if (lenp)
2372                 *lenp = len;
2373
2374         return dup;
2375 }
2376
2377 /*
2378  * This fills in the pool_name, image_name, image_name_len, snap_name,
2379  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2380  * on the list of monitor addresses and other options provided via
2381  * /sys/bus/rbd/add.
2382  *
2383  * Note: rbd_dev is assumed to have been initially zero-filled.
2384  */
2385 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2386                               const char *buf,
2387                               const char **mon_addrs,
2388                               size_t *mon_addrs_size,
2389                               char *options,
2390                              size_t options_size)
2391 {
2392         size_t len;
2393         int ret;
2394
2395         /* The first four tokens are required */
2396
2397         len = next_token(&buf);
2398         if (!len)
2399                 return -EINVAL;
2400         *mon_addrs_size = len + 1;
2401         *mon_addrs = buf;
2402
2403         buf += len;
2404
2405         len = copy_token(&buf, options, options_size);
2406         if (!len || len >= options_size)
2407                 return -EINVAL;
2408
2409         ret = -ENOMEM;
2410         rbd_dev->pool_name = dup_token(&buf, NULL);
2411         if (!rbd_dev->pool_name)
2412                 goto out_err;
2413
2414         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2415         if (!rbd_dev->image_name)
2416                 goto out_err;
2417
2418         /* Create the name of the header object */
2419
2420         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2421                                                 + sizeof (RBD_SUFFIX),
2422                                         GFP_KERNEL);
2423         if (!rbd_dev->header_name)
2424                 goto out_err;
2425         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2426
2427         /*
2428          * The snapshot name is optional.  If none is is supplied,
2429          * we use the default value.
2430          */
2431         rbd_dev->snap_name = dup_token(&buf, &len);
2432         if (!rbd_dev->snap_name)
2433                 goto out_err;
2434         if (!len) {
2435                 /* Replace the empty name with the default */
2436                 kfree(rbd_dev->snap_name);
2437                 rbd_dev->snap_name
2438                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2439                 if (!rbd_dev->snap_name)
2440                         goto out_err;
2441
2442                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2443                         sizeof (RBD_SNAP_HEAD_NAME));
2444         }
2445
2446         return 0;
2447
2448 out_err:
2449         kfree(rbd_dev->header_name);
2450         kfree(rbd_dev->image_name);
2451         kfree(rbd_dev->pool_name);
2452         rbd_dev->pool_name = NULL;
2453
2454         return ret;
2455 }
2456
2457 static ssize_t rbd_add(struct bus_type *bus,
2458                        const char *buf,
2459                        size_t count)
2460 {
2461         char *options;
2462         struct rbd_device *rbd_dev = NULL;
2463         const char *mon_addrs = NULL;
2464         size_t mon_addrs_size = 0;
2465         struct ceph_osd_client *osdc;
2466         int rc = -ENOMEM;
2467
2468         if (!try_module_get(THIS_MODULE))
2469                 return -ENODEV;
2470
2471         options = kmalloc(count, GFP_KERNEL);
2472         if (!options)
2473                 goto err_nomem;
2474         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2475         if (!rbd_dev)
2476                 goto err_nomem;
2477
2478         /* static rbd_device initialization */
2479         spin_lock_init(&rbd_dev->lock);
2480         INIT_LIST_HEAD(&rbd_dev->node);
2481         INIT_LIST_HEAD(&rbd_dev->snaps);
2482         init_rwsem(&rbd_dev->header_rwsem);
2483
2484         init_rwsem(&rbd_dev->header_rwsem);
2485
2486         /* generate unique id: find highest unique id, add one */
2487         rbd_id_get(rbd_dev);
2488
2489         /* Fill in the device name, now that we have its id. */
2490         BUILD_BUG_ON(DEV_NAME_LEN
2491                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2492         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2493
2494         /* parse add command */
2495         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2496                                 options, count);
2497         if (rc)
2498                 goto err_put_id;
2499
2500         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2501                                                 options);
2502         if (IS_ERR(rbd_dev->rbd_client)) {
2503                 rc = PTR_ERR(rbd_dev->rbd_client);
2504                 goto err_put_id;
2505         }
2506
2507         /* pick the pool */
2508         osdc = &rbd_dev->rbd_client->client->osdc;
2509         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2510         if (rc < 0)
2511                 goto err_out_client;
2512         rbd_dev->pool_id = rc;
2513
2514         /* register our block device */
2515         rc = register_blkdev(0, rbd_dev->name);
2516         if (rc < 0)
2517                 goto err_out_client;
2518         rbd_dev->major = rc;
2519
2520         rc = rbd_bus_add_dev(rbd_dev);
2521         if (rc)
2522                 goto err_out_blkdev;
2523
2524         /*
2525          * At this point cleanup in the event of an error is the job
2526          * of the sysfs code (initiated by rbd_bus_del_dev()).
2527          *
2528          * Set up and announce blkdev mapping.
2529          */
2530         rc = rbd_init_disk(rbd_dev);
2531         if (rc)
2532                 goto err_out_bus;
2533
2534         rc = rbd_init_watch_dev(rbd_dev);
2535         if (rc)
2536                 goto err_out_bus;
2537
2538         return count;
2539
2540 err_out_bus:
2541         /* this will also clean up rest of rbd_dev stuff */
2542
2543         rbd_bus_del_dev(rbd_dev);
2544         kfree(options);
2545         return rc;
2546
2547 err_out_blkdev:
2548         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2549 err_out_client:
2550         rbd_put_client(rbd_dev);
2551 err_put_id:
2552         if (rbd_dev->pool_name) {
2553                 kfree(rbd_dev->snap_name);
2554                 kfree(rbd_dev->header_name);
2555                 kfree(rbd_dev->image_name);
2556                 kfree(rbd_dev->pool_name);
2557         }
2558         rbd_id_put(rbd_dev);
2559 err_nomem:
2560         kfree(rbd_dev);
2561         kfree(options);
2562
2563         dout("Error adding device %s\n", buf);
2564         module_put(THIS_MODULE);
2565
2566         return (ssize_t) rc;
2567 }
2568
2569 static struct rbd_device *__rbd_get_dev(unsigned long id)
2570 {
2571         struct list_head *tmp;
2572         struct rbd_device *rbd_dev;
2573
2574         spin_lock(&rbd_dev_list_lock);
2575         list_for_each(tmp, &rbd_dev_list) {
2576                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2577                 if (rbd_dev->id == id) {
2578                         spin_unlock(&rbd_dev_list_lock);
2579                         return rbd_dev;
2580                 }
2581         }
2582         spin_unlock(&rbd_dev_list_lock);
2583         return NULL;
2584 }
2585
2586 static void rbd_dev_release(struct device *dev)
2587 {
2588         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2589
2590         if (rbd_dev->watch_request) {
2591                 struct ceph_client *client = rbd_dev->rbd_client->client;
2592
2593                 ceph_osdc_unregister_linger_request(&client->osdc,
2594                                                     rbd_dev->watch_request);
2595         }
2596         if (rbd_dev->watch_event)
2597                 rbd_req_sync_unwatch(rbd_dev, rbd_dev->header_name);
2598
2599         rbd_put_client(rbd_dev);
2600
2601         /* clean up and free blkdev */
2602         rbd_free_disk(rbd_dev);
2603         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2604
2605         /* done with the id, and with the rbd_dev */
2606         kfree(rbd_dev->snap_name);
2607         kfree(rbd_dev->header_name);
2608         kfree(rbd_dev->pool_name);
2609         kfree(rbd_dev->image_name);
2610         rbd_id_put(rbd_dev);
2611         kfree(rbd_dev);
2612
2613         /* release module ref */
2614         module_put(THIS_MODULE);
2615 }
2616
2617 static ssize_t rbd_remove(struct bus_type *bus,
2618                           const char *buf,
2619                           size_t count)
2620 {
2621         struct rbd_device *rbd_dev = NULL;
2622         int target_id, rc;
2623         unsigned long ul;
2624         int ret = count;
2625
2626         rc = strict_strtoul(buf, 10, &ul);
2627         if (rc)
2628                 return rc;
2629
2630         /* convert to int; abort if we lost anything in the conversion */
2631         target_id = (int) ul;
2632         if (target_id != ul)
2633                 return -EINVAL;
2634
2635         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2636
2637         rbd_dev = __rbd_get_dev(target_id);
2638         if (!rbd_dev) {
2639                 ret = -ENOENT;
2640                 goto done;
2641         }
2642
2643         __rbd_remove_all_snaps(rbd_dev);
2644         rbd_bus_del_dev(rbd_dev);
2645
2646 done:
2647         mutex_unlock(&ctl_mutex);
2648         return ret;
2649 }
2650
2651 static ssize_t rbd_snap_add(struct device *dev,
2652                             struct device_attribute *attr,
2653                             const char *buf,
2654                             size_t count)
2655 {
2656         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2657         int ret;
2658         char *name = kmalloc(count + 1, GFP_KERNEL);
2659         if (!name)
2660                 return -ENOMEM;
2661
2662         snprintf(name, count, "%s", buf);
2663
2664         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2665
2666         ret = rbd_header_add_snap(rbd_dev,
2667                                   name, GFP_KERNEL);
2668         if (ret < 0)
2669                 goto err_unlock;
2670
2671         ret = __rbd_refresh_header(rbd_dev);
2672         if (ret < 0)
2673                 goto err_unlock;
2674
2675         /* shouldn't hold ctl_mutex when notifying.. notify might
2676            trigger a watch callback that would need to get that mutex */
2677         mutex_unlock(&ctl_mutex);
2678
2679         /* make a best effort, don't error if failed */
2680         rbd_req_sync_notify(rbd_dev, rbd_dev->header_name);
2681
2682         ret = count;
2683         kfree(name);
2684         return ret;
2685
2686 err_unlock:
2687         mutex_unlock(&ctl_mutex);
2688         kfree(name);
2689         return ret;
2690 }
2691
2692 /*
2693  * create control files in sysfs
2694  * /sys/bus/rbd/...
2695  */
2696 static int rbd_sysfs_init(void)
2697 {
2698         int ret;
2699
2700         ret = device_register(&rbd_root_dev);
2701         if (ret < 0)
2702                 return ret;
2703
2704         ret = bus_register(&rbd_bus_type);
2705         if (ret < 0)
2706                 device_unregister(&rbd_root_dev);
2707
2708         return ret;
2709 }
2710
2711 static void rbd_sysfs_cleanup(void)
2712 {
2713         bus_unregister(&rbd_bus_type);
2714         device_unregister(&rbd_root_dev);
2715 }
2716
2717 int __init rbd_init(void)
2718 {
2719         int rc;
2720
2721         rc = rbd_sysfs_init();
2722         if (rc)
2723                 return rc;
2724         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2725         return 0;
2726 }
2727
2728 void __exit rbd_exit(void)
2729 {
2730         rbd_sysfs_cleanup();
2731 }
2732
2733 module_init(rbd_init);
2734 module_exit(rbd_exit);
2735
2736 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2737 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2738 MODULE_DESCRIPTION("rados block device");
2739
2740 /* following authorship retained from original osdblk.c */
2741 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2742
2743 MODULE_LICENSE("GPL");