rbd: drop "object_name" from rbd_req_sync_notify_ack()
[firefly-linux-kernel-4.4.55.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         size_t snap_names_len;
85         u32 total_snaps;
86
87         char *snap_names;
88         u64 *snap_sizes;
89
90         u64 obj_version;
91 };
92
93 struct rbd_options {
94         int     notify_timeout;
95 };
96
97 /*
98  * an instance of the client.  multiple devices may share an rbd client.
99  */
100 struct rbd_client {
101         struct ceph_client      *client;
102         struct rbd_options      *rbd_opts;
103         struct kref             kref;
104         struct list_head        node;
105 };
106
107 /*
108  * a request completion status
109  */
110 struct rbd_req_status {
111         int done;
112         int rc;
113         u64 bytes;
114 };
115
116 /*
117  * a collection of requests
118  */
119 struct rbd_req_coll {
120         int                     total;
121         int                     num_done;
122         struct kref             kref;
123         struct rbd_req_status   status[0];
124 };
125
126 /*
127  * a single io request
128  */
129 struct rbd_request {
130         struct request          *rq;            /* blk layer request */
131         struct bio              *bio;           /* cloned bio */
132         struct page             **pages;        /* list of used pages */
133         u64                     len;
134         int                     coll_index;
135         struct rbd_req_coll     *coll;
136 };
137
138 struct rbd_snap {
139         struct  device          dev;
140         const char              *name;
141         u64                     size;
142         struct list_head        node;
143         u64                     id;
144 };
145
146 /*
147  * a single device
148  */
149 struct rbd_device {
150         int                     dev_id;         /* blkdev unique id */
151
152         int                     major;          /* blkdev assigned major */
153         struct gendisk          *disk;          /* blkdev's gendisk and rq */
154         struct request_queue    *q;
155
156         struct rbd_client       *rbd_client;
157
158         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160         spinlock_t              lock;           /* queue lock */
161
162         struct rbd_image_header header;
163         char                    *image_name;
164         size_t                  image_name_len;
165         char                    *header_name;
166         char                    *pool_name;
167         int                     pool_id;
168
169         struct ceph_osd_event   *watch_event;
170         struct ceph_osd_request *watch_request;
171
172         /* protects updating the header */
173         struct rw_semaphore     header_rwsem;
174         /* name of the snapshot this device reads from */
175         char                    *snap_name;
176         /* id of the snapshot this device reads from */
177         u64                     snap_id;        /* current snapshot id */
178         /* whether the snap_id this device reads from still exists */
179         bool                    snap_exists;
180         int                     read_only;
181
182         struct list_head        node;
183
184         /* list of snapshots */
185         struct list_head        snaps;
186
187         /* sysfs related */
188         struct device           dev;
189 };
190
191 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
192
193 static LIST_HEAD(rbd_dev_list);    /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196 static LIST_HEAD(rbd_client_list);              /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202                             struct device_attribute *attr,
203                             const char *buf,
204                             size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208                        size_t count);
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210                           size_t count);
211
212 static struct bus_attribute rbd_bus_attrs[] = {
213         __ATTR(add, S_IWUSR, NULL, rbd_add),
214         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
215         __ATTR_NULL
216 };
217
218 static struct bus_type rbd_bus_type = {
219         .name           = "rbd",
220         .bus_attrs      = rbd_bus_attrs,
221 };
222
223 static void rbd_root_dev_release(struct device *dev)
224 {
225 }
226
227 static struct device rbd_root_dev = {
228         .init_name =    "rbd",
229         .release =      rbd_root_dev_release,
230 };
231
232
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 {
235         return get_device(&rbd_dev->dev);
236 }
237
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 {
240         put_device(&rbd_dev->dev);
241 }
242
243 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
244
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 {
247         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248
249         rbd_get_dev(rbd_dev);
250
251         set_device_ro(bdev, rbd_dev->read_only);
252
253         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
254                 return -EROFS;
255
256         return 0;
257 }
258
259 static int rbd_release(struct gendisk *disk, fmode_t mode)
260 {
261         struct rbd_device *rbd_dev = disk->private_data;
262
263         rbd_put_dev(rbd_dev);
264
265         return 0;
266 }
267
268 static const struct block_device_operations rbd_bd_ops = {
269         .owner                  = THIS_MODULE,
270         .open                   = rbd_open,
271         .release                = rbd_release,
272 };
273
274 /*
275  * Initialize an rbd client instance.
276  * We own *ceph_opts.
277  */
278 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
279                                             struct rbd_options *rbd_opts)
280 {
281         struct rbd_client *rbdc;
282         int ret = -ENOMEM;
283
284         dout("rbd_client_create\n");
285         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
286         if (!rbdc)
287                 goto out_opt;
288
289         kref_init(&rbdc->kref);
290         INIT_LIST_HEAD(&rbdc->node);
291
292         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
293
294         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
295         if (IS_ERR(rbdc->client))
296                 goto out_mutex;
297         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
298
299         ret = ceph_open_session(rbdc->client);
300         if (ret < 0)
301                 goto out_err;
302
303         rbdc->rbd_opts = rbd_opts;
304
305         spin_lock(&rbd_client_list_lock);
306         list_add_tail(&rbdc->node, &rbd_client_list);
307         spin_unlock(&rbd_client_list_lock);
308
309         mutex_unlock(&ctl_mutex);
310
311         dout("rbd_client_create created %p\n", rbdc);
312         return rbdc;
313
314 out_err:
315         ceph_destroy_client(rbdc->client);
316 out_mutex:
317         mutex_unlock(&ctl_mutex);
318         kfree(rbdc);
319 out_opt:
320         if (ceph_opts)
321                 ceph_destroy_options(ceph_opts);
322         return ERR_PTR(ret);
323 }
324
325 /*
326  * Find a ceph client with specific addr and configuration.
327  */
328 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
329 {
330         struct rbd_client *client_node;
331
332         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
333                 return NULL;
334
335         list_for_each_entry(client_node, &rbd_client_list, node)
336                 if (!ceph_compare_options(ceph_opts, client_node->client))
337                         return client_node;
338         return NULL;
339 }
340
341 /*
342  * mount options
343  */
344 enum {
345         Opt_notify_timeout,
346         Opt_last_int,
347         /* int args above */
348         Opt_last_string,
349         /* string args above */
350 };
351
352 static match_table_t rbd_opts_tokens = {
353         {Opt_notify_timeout, "notify_timeout=%d"},
354         /* int args above */
355         /* string args above */
356         {-1, NULL}
357 };
358
359 static int parse_rbd_opts_token(char *c, void *private)
360 {
361         struct rbd_options *rbd_opts = private;
362         substring_t argstr[MAX_OPT_ARGS];
363         int token, intval, ret;
364
365         token = match_token(c, rbd_opts_tokens, argstr);
366         if (token < 0)
367                 return -EINVAL;
368
369         if (token < Opt_last_int) {
370                 ret = match_int(&argstr[0], &intval);
371                 if (ret < 0) {
372                         pr_err("bad mount option arg (not int) "
373                                "at '%s'\n", c);
374                         return ret;
375                 }
376                 dout("got int token %d val %d\n", token, intval);
377         } else if (token > Opt_last_int && token < Opt_last_string) {
378                 dout("got string token %d val %s\n", token,
379                      argstr[0].from);
380         } else {
381                 dout("got token %d\n", token);
382         }
383
384         switch (token) {
385         case Opt_notify_timeout:
386                 rbd_opts->notify_timeout = intval;
387                 break;
388         default:
389                 BUG_ON(token);
390         }
391         return 0;
392 }
393
394 /*
395  * Get a ceph client with specific addr and configuration, if one does
396  * not exist create it.
397  */
398 static struct rbd_client *rbd_get_client(const char *mon_addr,
399                                          size_t mon_addr_len,
400                                          char *options)
401 {
402         struct rbd_client *rbdc;
403         struct ceph_options *ceph_opts;
404         struct rbd_options *rbd_opts;
405
406         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
407         if (!rbd_opts)
408                 return ERR_PTR(-ENOMEM);
409
410         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
411
412         ceph_opts = ceph_parse_options(options, mon_addr,
413                                         mon_addr + mon_addr_len,
414                                         parse_rbd_opts_token, rbd_opts);
415         if (IS_ERR(ceph_opts)) {
416                 kfree(rbd_opts);
417                 return ERR_CAST(ceph_opts);
418         }
419
420         spin_lock(&rbd_client_list_lock);
421         rbdc = __rbd_client_find(ceph_opts);
422         if (rbdc) {
423                 /* using an existing client */
424                 kref_get(&rbdc->kref);
425                 spin_unlock(&rbd_client_list_lock);
426
427                 ceph_destroy_options(ceph_opts);
428                 kfree(rbd_opts);
429
430                 return rbdc;
431         }
432         spin_unlock(&rbd_client_list_lock);
433
434         rbdc = rbd_client_create(ceph_opts, rbd_opts);
435
436         if (IS_ERR(rbdc))
437                 kfree(rbd_opts);
438
439         return rbdc;
440 }
441
442 /*
443  * Destroy ceph client
444  *
445  * Caller must hold rbd_client_list_lock.
446  */
447 static void rbd_client_release(struct kref *kref)
448 {
449         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
450
451         dout("rbd_release_client %p\n", rbdc);
452         spin_lock(&rbd_client_list_lock);
453         list_del(&rbdc->node);
454         spin_unlock(&rbd_client_list_lock);
455
456         ceph_destroy_client(rbdc->client);
457         kfree(rbdc->rbd_opts);
458         kfree(rbdc);
459 }
460
461 /*
462  * Drop reference to ceph client node. If it's not referenced anymore, release
463  * it.
464  */
465 static void rbd_put_client(struct rbd_device *rbd_dev)
466 {
467         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
468         rbd_dev->rbd_client = NULL;
469 }
470
471 /*
472  * Destroy requests collection
473  */
474 static void rbd_coll_release(struct kref *kref)
475 {
476         struct rbd_req_coll *coll =
477                 container_of(kref, struct rbd_req_coll, kref);
478
479         dout("rbd_coll_release %p\n", coll);
480         kfree(coll);
481 }
482
483 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
484 {
485         return !memcmp(&ondisk->text,
486                         RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
487 }
488
489 /*
490  * Create a new header structure, translate header format from the on-disk
491  * header.
492  */
493 static int rbd_header_from_disk(struct rbd_image_header *header,
494                                  struct rbd_image_header_ondisk *ondisk,
495                                  u32 allocated_snaps)
496 {
497         u32 i, snap_count;
498
499         if (!rbd_dev_ondisk_valid(ondisk))
500                 return -ENXIO;
501
502         snap_count = le32_to_cpu(ondisk->snap_count);
503         if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
504                          / sizeof (*ondisk))
505                 return -EINVAL;
506         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
507                                 snap_count * sizeof(u64),
508                                 GFP_KERNEL);
509         if (!header->snapc)
510                 return -ENOMEM;
511
512         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
513         if (snap_count) {
514                 header->snap_names = kmalloc(header->snap_names_len,
515                                              GFP_KERNEL);
516                 if (!header->snap_names)
517                         goto err_snapc;
518                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
519                                              GFP_KERNEL);
520                 if (!header->snap_sizes)
521                         goto err_names;
522         } else {
523                 header->snap_names = NULL;
524                 header->snap_sizes = NULL;
525         }
526
527         header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
528                                         GFP_KERNEL);
529         if (!header->object_prefix)
530                 goto err_sizes;
531
532         memcpy(header->object_prefix, ondisk->block_name,
533                sizeof(ondisk->block_name));
534         header->object_prefix[sizeof (ondisk->block_name)] = '\0';
535
536         header->image_size = le64_to_cpu(ondisk->image_size);
537         header->obj_order = ondisk->options.order;
538         header->crypt_type = ondisk->options.crypt_type;
539         header->comp_type = ondisk->options.comp_type;
540
541         atomic_set(&header->snapc->nref, 1);
542         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
543         header->snapc->num_snaps = snap_count;
544         header->total_snaps = snap_count;
545
546         if (snap_count && allocated_snaps == snap_count) {
547                 for (i = 0; i < snap_count; i++) {
548                         header->snapc->snaps[i] =
549                                 le64_to_cpu(ondisk->snaps[i].id);
550                         header->snap_sizes[i] =
551                                 le64_to_cpu(ondisk->snaps[i].image_size);
552                 }
553
554                 /* copy snapshot names */
555                 memcpy(header->snap_names, &ondisk->snaps[i],
556                         header->snap_names_len);
557         }
558
559         return 0;
560
561 err_sizes:
562         kfree(header->snap_sizes);
563 err_names:
564         kfree(header->snap_names);
565 err_snapc:
566         kfree(header->snapc);
567         return -ENOMEM;
568 }
569
570 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
571                         u64 *seq, u64 *size)
572 {
573         int i;
574         char *p = header->snap_names;
575
576         for (i = 0; i < header->total_snaps; i++) {
577                 if (!strcmp(snap_name, p)) {
578
579                         /* Found it.  Pass back its id and/or size */
580
581                         if (seq)
582                                 *seq = header->snapc->snaps[i];
583                         if (size)
584                                 *size = header->snap_sizes[i];
585                         return i;
586                 }
587                 p += strlen(p) + 1;     /* Skip ahead to the next name */
588         }
589         return -ENOENT;
590 }
591
592 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
593 {
594         int ret;
595
596         down_write(&rbd_dev->header_rwsem);
597
598         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
599                     sizeof (RBD_SNAP_HEAD_NAME))) {
600                 rbd_dev->snap_id = CEPH_NOSNAP;
601                 rbd_dev->snap_exists = false;
602                 rbd_dev->read_only = 0;
603                 if (size)
604                         *size = rbd_dev->header.image_size;
605         } else {
606                 u64 snap_id = 0;
607
608                 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
609                                         &snap_id, size);
610                 if (ret < 0)
611                         goto done;
612                 rbd_dev->snap_id = snap_id;
613                 rbd_dev->snap_exists = true;
614                 rbd_dev->read_only = 1;
615         }
616
617         ret = 0;
618 done:
619         up_write(&rbd_dev->header_rwsem);
620         return ret;
621 }
622
623 static void rbd_header_free(struct rbd_image_header *header)
624 {
625         kfree(header->object_prefix);
626         kfree(header->snap_sizes);
627         kfree(header->snap_names);
628         ceph_put_snap_context(header->snapc);
629 }
630
631 /*
632  * get the actual striped segment name, offset and length
633  */
634 static u64 rbd_get_segment(struct rbd_image_header *header,
635                            const char *object_prefix,
636                            u64 ofs, u64 len,
637                            char *seg_name, u64 *segofs)
638 {
639         u64 seg = ofs >> header->obj_order;
640
641         if (seg_name)
642                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
643                          "%s.%012llx", object_prefix, seg);
644
645         ofs = ofs & ((1 << header->obj_order) - 1);
646         len = min_t(u64, len, (1 << header->obj_order) - ofs);
647
648         if (segofs)
649                 *segofs = ofs;
650
651         return len;
652 }
653
654 static int rbd_get_num_segments(struct rbd_image_header *header,
655                                 u64 ofs, u64 len)
656 {
657         u64 start_seg = ofs >> header->obj_order;
658         u64 end_seg = (ofs + len - 1) >> header->obj_order;
659         return end_seg - start_seg + 1;
660 }
661
662 /*
663  * returns the size of an object in the image
664  */
665 static u64 rbd_obj_bytes(struct rbd_image_header *header)
666 {
667         return 1 << header->obj_order;
668 }
669
670 /*
671  * bio helpers
672  */
673
674 static void bio_chain_put(struct bio *chain)
675 {
676         struct bio *tmp;
677
678         while (chain) {
679                 tmp = chain;
680                 chain = chain->bi_next;
681                 bio_put(tmp);
682         }
683 }
684
685 /*
686  * zeros a bio chain, starting at specific offset
687  */
688 static void zero_bio_chain(struct bio *chain, int start_ofs)
689 {
690         struct bio_vec *bv;
691         unsigned long flags;
692         void *buf;
693         int i;
694         int pos = 0;
695
696         while (chain) {
697                 bio_for_each_segment(bv, chain, i) {
698                         if (pos + bv->bv_len > start_ofs) {
699                                 int remainder = max(start_ofs - pos, 0);
700                                 buf = bvec_kmap_irq(bv, &flags);
701                                 memset(buf + remainder, 0,
702                                        bv->bv_len - remainder);
703                                 bvec_kunmap_irq(buf, &flags);
704                         }
705                         pos += bv->bv_len;
706                 }
707
708                 chain = chain->bi_next;
709         }
710 }
711
712 /*
713  * bio_chain_clone - clone a chain of bios up to a certain length.
714  * might return a bio_pair that will need to be released.
715  */
716 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
717                                    struct bio_pair **bp,
718                                    int len, gfp_t gfpmask)
719 {
720         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
721         int total = 0;
722
723         if (*bp) {
724                 bio_pair_release(*bp);
725                 *bp = NULL;
726         }
727
728         while (old_chain && (total < len)) {
729                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
730                 if (!tmp)
731                         goto err_out;
732
733                 if (total + old_chain->bi_size > len) {
734                         struct bio_pair *bp;
735
736                         /*
737                          * this split can only happen with a single paged bio,
738                          * split_bio will BUG_ON if this is not the case
739                          */
740                         dout("bio_chain_clone split! total=%d remaining=%d"
741                              "bi_size=%u\n",
742                              total, len - total, old_chain->bi_size);
743
744                         /* split the bio. We'll release it either in the next
745                            call, or it will have to be released outside */
746                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
747                         if (!bp)
748                                 goto err_out;
749
750                         __bio_clone(tmp, &bp->bio1);
751
752                         *next = &bp->bio2;
753                 } else {
754                         __bio_clone(tmp, old_chain);
755                         *next = old_chain->bi_next;
756                 }
757
758                 tmp->bi_bdev = NULL;
759                 gfpmask &= ~__GFP_WAIT;
760                 tmp->bi_next = NULL;
761
762                 if (!new_chain) {
763                         new_chain = tail = tmp;
764                 } else {
765                         tail->bi_next = tmp;
766                         tail = tmp;
767                 }
768                 old_chain = old_chain->bi_next;
769
770                 total += tmp->bi_size;
771         }
772
773         BUG_ON(total < len);
774
775         if (tail)
776                 tail->bi_next = NULL;
777
778         *old = old_chain;
779
780         return new_chain;
781
782 err_out:
783         dout("bio_chain_clone with err\n");
784         bio_chain_put(new_chain);
785         return NULL;
786 }
787
788 /*
789  * helpers for osd request op vectors.
790  */
791 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
792                             int num_ops,
793                             int opcode,
794                             u32 payload_len)
795 {
796         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
797                        GFP_NOIO);
798         if (!*ops)
799                 return -ENOMEM;
800         (*ops)[0].op = opcode;
801         /*
802          * op extent offset and length will be set later on
803          * in calc_raw_layout()
804          */
805         (*ops)[0].payload_len = payload_len;
806         return 0;
807 }
808
809 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
810 {
811         kfree(ops);
812 }
813
814 static void rbd_coll_end_req_index(struct request *rq,
815                                    struct rbd_req_coll *coll,
816                                    int index,
817                                    int ret, u64 len)
818 {
819         struct request_queue *q;
820         int min, max, i;
821
822         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
823              coll, index, ret, (unsigned long long) len);
824
825         if (!rq)
826                 return;
827
828         if (!coll) {
829                 blk_end_request(rq, ret, len);
830                 return;
831         }
832
833         q = rq->q;
834
835         spin_lock_irq(q->queue_lock);
836         coll->status[index].done = 1;
837         coll->status[index].rc = ret;
838         coll->status[index].bytes = len;
839         max = min = coll->num_done;
840         while (max < coll->total && coll->status[max].done)
841                 max++;
842
843         for (i = min; i<max; i++) {
844                 __blk_end_request(rq, coll->status[i].rc,
845                                   coll->status[i].bytes);
846                 coll->num_done++;
847                 kref_put(&coll->kref, rbd_coll_release);
848         }
849         spin_unlock_irq(q->queue_lock);
850 }
851
852 static void rbd_coll_end_req(struct rbd_request *req,
853                              int ret, u64 len)
854 {
855         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
856 }
857
858 /*
859  * Send ceph osd request
860  */
861 static int rbd_do_request(struct request *rq,
862                           struct rbd_device *rbd_dev,
863                           struct ceph_snap_context *snapc,
864                           u64 snapid,
865                           const char *object_name, u64 ofs, u64 len,
866                           struct bio *bio,
867                           struct page **pages,
868                           int num_pages,
869                           int flags,
870                           struct ceph_osd_req_op *ops,
871                           struct rbd_req_coll *coll,
872                           int coll_index,
873                           void (*rbd_cb)(struct ceph_osd_request *req,
874                                          struct ceph_msg *msg),
875                           struct ceph_osd_request **linger_req,
876                           u64 *ver)
877 {
878         struct ceph_osd_request *req;
879         struct ceph_file_layout *layout;
880         int ret;
881         u64 bno;
882         struct timespec mtime = CURRENT_TIME;
883         struct rbd_request *req_data;
884         struct ceph_osd_request_head *reqhead;
885         struct ceph_osd_client *osdc;
886
887         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
888         if (!req_data) {
889                 if (coll)
890                         rbd_coll_end_req_index(rq, coll, coll_index,
891                                                -ENOMEM, len);
892                 return -ENOMEM;
893         }
894
895         if (coll) {
896                 req_data->coll = coll;
897                 req_data->coll_index = coll_index;
898         }
899
900         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
901                 (unsigned long long) ofs, (unsigned long long) len);
902
903         osdc = &rbd_dev->rbd_client->client->osdc;
904         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
905                                         false, GFP_NOIO, pages, bio);
906         if (!req) {
907                 ret = -ENOMEM;
908                 goto done_pages;
909         }
910
911         req->r_callback = rbd_cb;
912
913         req_data->rq = rq;
914         req_data->bio = bio;
915         req_data->pages = pages;
916         req_data->len = len;
917
918         req->r_priv = req_data;
919
920         reqhead = req->r_request->front.iov_base;
921         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
922
923         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
924         req->r_oid_len = strlen(req->r_oid);
925
926         layout = &req->r_file_layout;
927         memset(layout, 0, sizeof(*layout));
928         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
929         layout->fl_stripe_count = cpu_to_le32(1);
930         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
931         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
932         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
933                                 req, ops);
934
935         ceph_osdc_build_request(req, ofs, &len,
936                                 ops,
937                                 snapc,
938                                 &mtime,
939                                 req->r_oid, req->r_oid_len);
940
941         if (linger_req) {
942                 ceph_osdc_set_request_linger(osdc, req);
943                 *linger_req = req;
944         }
945
946         ret = ceph_osdc_start_request(osdc, req, false);
947         if (ret < 0)
948                 goto done_err;
949
950         if (!rbd_cb) {
951                 ret = ceph_osdc_wait_request(osdc, req);
952                 if (ver)
953                         *ver = le64_to_cpu(req->r_reassert_version.version);
954                 dout("reassert_ver=%llu\n",
955                         (unsigned long long)
956                                 le64_to_cpu(req->r_reassert_version.version));
957                 ceph_osdc_put_request(req);
958         }
959         return ret;
960
961 done_err:
962         bio_chain_put(req_data->bio);
963         ceph_osdc_put_request(req);
964 done_pages:
965         rbd_coll_end_req(req_data, ret, len);
966         kfree(req_data);
967         return ret;
968 }
969
970 /*
971  * Ceph osd op callback
972  */
973 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
974 {
975         struct rbd_request *req_data = req->r_priv;
976         struct ceph_osd_reply_head *replyhead;
977         struct ceph_osd_op *op;
978         __s32 rc;
979         u64 bytes;
980         int read_op;
981
982         /* parse reply */
983         replyhead = msg->front.iov_base;
984         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
985         op = (void *)(replyhead + 1);
986         rc = le32_to_cpu(replyhead->result);
987         bytes = le64_to_cpu(op->extent.length);
988         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
989
990         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
991                 (unsigned long long) bytes, read_op, (int) rc);
992
993         if (rc == -ENOENT && read_op) {
994                 zero_bio_chain(req_data->bio, 0);
995                 rc = 0;
996         } else if (rc == 0 && read_op && bytes < req_data->len) {
997                 zero_bio_chain(req_data->bio, bytes);
998                 bytes = req_data->len;
999         }
1000
1001         rbd_coll_end_req(req_data, rc, bytes);
1002
1003         if (req_data->bio)
1004                 bio_chain_put(req_data->bio);
1005
1006         ceph_osdc_put_request(req);
1007         kfree(req_data);
1008 }
1009
1010 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1011 {
1012         ceph_osdc_put_request(req);
1013 }
1014
1015 /*
1016  * Do a synchronous ceph osd operation
1017  */
1018 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1019                            struct ceph_snap_context *snapc,
1020                            u64 snapid,
1021                            int opcode,
1022                            int flags,
1023                            struct ceph_osd_req_op *orig_ops,
1024                            const char *object_name,
1025                            u64 ofs, u64 len,
1026                            char *buf,
1027                            struct ceph_osd_request **linger_req,
1028                            u64 *ver)
1029 {
1030         int ret;
1031         struct page **pages;
1032         int num_pages;
1033         struct ceph_osd_req_op *ops = orig_ops;
1034         u32 payload_len;
1035
1036         num_pages = calc_pages_for(ofs , len);
1037         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1038         if (IS_ERR(pages))
1039                 return PTR_ERR(pages);
1040
1041         if (!orig_ops) {
1042                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1043                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1044                 if (ret < 0)
1045                         goto done;
1046
1047                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1048                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1049                         if (ret < 0)
1050                                 goto done_ops;
1051                 }
1052         }
1053
1054         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1055                           object_name, ofs, len, NULL,
1056                           pages, num_pages,
1057                           flags,
1058                           ops,
1059                           NULL, 0,
1060                           NULL,
1061                           linger_req, ver);
1062         if (ret < 0)
1063                 goto done_ops;
1064
1065         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1066                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1067
1068 done_ops:
1069         if (!orig_ops)
1070                 rbd_destroy_ops(ops);
1071 done:
1072         ceph_release_page_vector(pages, num_pages);
1073         return ret;
1074 }
1075
1076 /*
1077  * Do an asynchronous ceph osd operation
1078  */
1079 static int rbd_do_op(struct request *rq,
1080                      struct rbd_device *rbd_dev,
1081                      struct ceph_snap_context *snapc,
1082                      u64 snapid,
1083                      int opcode, int flags,
1084                      u64 ofs, u64 len,
1085                      struct bio *bio,
1086                      struct rbd_req_coll *coll,
1087                      int coll_index)
1088 {
1089         char *seg_name;
1090         u64 seg_ofs;
1091         u64 seg_len;
1092         int ret;
1093         struct ceph_osd_req_op *ops;
1094         u32 payload_len;
1095
1096         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1097         if (!seg_name)
1098                 return -ENOMEM;
1099
1100         seg_len = rbd_get_segment(&rbd_dev->header,
1101                                   rbd_dev->header.object_prefix,
1102                                   ofs, len,
1103                                   seg_name, &seg_ofs);
1104
1105         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1106
1107         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1108         if (ret < 0)
1109                 goto done;
1110
1111         /* we've taken care of segment sizes earlier when we
1112            cloned the bios. We should never have a segment
1113            truncated at this point */
1114         BUG_ON(seg_len < len);
1115
1116         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1117                              seg_name, seg_ofs, seg_len,
1118                              bio,
1119                              NULL, 0,
1120                              flags,
1121                              ops,
1122                              coll, coll_index,
1123                              rbd_req_cb, 0, NULL);
1124
1125         rbd_destroy_ops(ops);
1126 done:
1127         kfree(seg_name);
1128         return ret;
1129 }
1130
1131 /*
1132  * Request async osd write
1133  */
1134 static int rbd_req_write(struct request *rq,
1135                          struct rbd_device *rbd_dev,
1136                          struct ceph_snap_context *snapc,
1137                          u64 ofs, u64 len,
1138                          struct bio *bio,
1139                          struct rbd_req_coll *coll,
1140                          int coll_index)
1141 {
1142         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1143                          CEPH_OSD_OP_WRITE,
1144                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1145                          ofs, len, bio, coll, coll_index);
1146 }
1147
1148 /*
1149  * Request async osd read
1150  */
1151 static int rbd_req_read(struct request *rq,
1152                          struct rbd_device *rbd_dev,
1153                          u64 snapid,
1154                          u64 ofs, u64 len,
1155                          struct bio *bio,
1156                          struct rbd_req_coll *coll,
1157                          int coll_index)
1158 {
1159         return rbd_do_op(rq, rbd_dev, NULL,
1160                          snapid,
1161                          CEPH_OSD_OP_READ,
1162                          CEPH_OSD_FLAG_READ,
1163                          ofs, len, bio, coll, coll_index);
1164 }
1165
1166 /*
1167  * Request sync osd read
1168  */
1169 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1170                           u64 snapid,
1171                           const char *object_name,
1172                           u64 ofs, u64 len,
1173                           char *buf,
1174                           u64 *ver)
1175 {
1176         return rbd_req_sync_op(rbd_dev, NULL,
1177                                snapid,
1178                                CEPH_OSD_OP_READ,
1179                                CEPH_OSD_FLAG_READ,
1180                                NULL,
1181                                object_name, ofs, len, buf, NULL, ver);
1182 }
1183
1184 /*
1185  * Request sync osd watch
1186  */
1187 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1188                                    u64 ver,
1189                                    u64 notify_id)
1190 {
1191         struct ceph_osd_req_op *ops;
1192         int ret;
1193
1194         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1195         if (ret < 0)
1196                 return ret;
1197
1198         ops[0].watch.ver = cpu_to_le64(ver);
1199         ops[0].watch.cookie = notify_id;
1200         ops[0].watch.flag = 0;
1201
1202         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1203                           rbd_dev->header_name, 0, 0, NULL,
1204                           NULL, 0,
1205                           CEPH_OSD_FLAG_READ,
1206                           ops,
1207                           NULL, 0,
1208                           rbd_simple_req_cb, 0, NULL);
1209
1210         rbd_destroy_ops(ops);
1211         return ret;
1212 }
1213
1214 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1215 {
1216         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1217         u64 hver;
1218         int rc;
1219
1220         if (!rbd_dev)
1221                 return;
1222
1223         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1224                 rbd_dev->header_name, (unsigned long long) notify_id,
1225                 (unsigned int) opcode);
1226         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1227         rc = __rbd_refresh_header(rbd_dev);
1228         hver = rbd_dev->header.obj_version;
1229         mutex_unlock(&ctl_mutex);
1230         if (rc)
1231                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1232                            " update snaps: %d\n", rbd_dev->major, rc);
1233
1234         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1235 }
1236
1237 /*
1238  * Request sync osd watch
1239  */
1240 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1241 {
1242         struct ceph_osd_req_op *ops;
1243         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1244
1245         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1246         if (ret < 0)
1247                 return ret;
1248
1249         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1250                                      (void *)rbd_dev, &rbd_dev->watch_event);
1251         if (ret < 0)
1252                 goto fail;
1253
1254         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1255         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1256         ops[0].watch.flag = 1;
1257
1258         ret = rbd_req_sync_op(rbd_dev, NULL,
1259                               CEPH_NOSNAP,
1260                               0,
1261                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1262                               ops,
1263                               rbd_dev->header_name,
1264                               0, 0, NULL,
1265                               &rbd_dev->watch_request, NULL);
1266
1267         if (ret < 0)
1268                 goto fail_event;
1269
1270         rbd_destroy_ops(ops);
1271         return 0;
1272
1273 fail_event:
1274         ceph_osdc_cancel_event(rbd_dev->watch_event);
1275         rbd_dev->watch_event = NULL;
1276 fail:
1277         rbd_destroy_ops(ops);
1278         return ret;
1279 }
1280
1281 /*
1282  * Request sync osd unwatch
1283  */
1284 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev,
1285                                 const char *object_name)
1286 {
1287         struct ceph_osd_req_op *ops;
1288
1289         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1290         if (ret < 0)
1291                 return ret;
1292
1293         ops[0].watch.ver = 0;
1294         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1295         ops[0].watch.flag = 0;
1296
1297         ret = rbd_req_sync_op(rbd_dev, NULL,
1298                               CEPH_NOSNAP,
1299                               0,
1300                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1301                               ops,
1302                               object_name, 0, 0, NULL, NULL, NULL);
1303
1304         rbd_destroy_ops(ops);
1305         ceph_osdc_cancel_event(rbd_dev->watch_event);
1306         rbd_dev->watch_event = NULL;
1307         return ret;
1308 }
1309
1310 struct rbd_notify_info {
1311         struct rbd_device *rbd_dev;
1312 };
1313
1314 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1315 {
1316         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1317         if (!rbd_dev)
1318                 return;
1319
1320         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1321                         rbd_dev->header_name, (unsigned long long) notify_id,
1322                         (unsigned int) opcode);
1323 }
1324
1325 /*
1326  * Request sync osd notify
1327  */
1328 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1329 {
1330         struct ceph_osd_req_op *ops;
1331         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1332         struct ceph_osd_event *event;
1333         struct rbd_notify_info info;
1334         int payload_len = sizeof(u32) + sizeof(u32);
1335         int ret;
1336
1337         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1338         if (ret < 0)
1339                 return ret;
1340
1341         info.rbd_dev = rbd_dev;
1342
1343         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1344                                      (void *)&info, &event);
1345         if (ret < 0)
1346                 goto fail;
1347
1348         ops[0].watch.ver = 1;
1349         ops[0].watch.flag = 1;
1350         ops[0].watch.cookie = event->cookie;
1351         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1352         ops[0].watch.timeout = 12;
1353
1354         ret = rbd_req_sync_op(rbd_dev, NULL,
1355                                CEPH_NOSNAP,
1356                                0,
1357                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1358                                ops,
1359                                rbd_dev->header_name,
1360                                0, 0, NULL, NULL, NULL);
1361         if (ret < 0)
1362                 goto fail_event;
1363
1364         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1365         dout("ceph_osdc_wait_event returned %d\n", ret);
1366         rbd_destroy_ops(ops);
1367         return 0;
1368
1369 fail_event:
1370         ceph_osdc_cancel_event(event);
1371 fail:
1372         rbd_destroy_ops(ops);
1373         return ret;
1374 }
1375
1376 /*
1377  * Request sync osd read
1378  */
1379 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1380                              const char *object_name,
1381                              const char *class_name,
1382                              const char *method_name,
1383                              const char *data,
1384                              int len,
1385                              u64 *ver)
1386 {
1387         struct ceph_osd_req_op *ops;
1388         int class_name_len = strlen(class_name);
1389         int method_name_len = strlen(method_name);
1390         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1391                                     class_name_len + method_name_len + len);
1392         if (ret < 0)
1393                 return ret;
1394
1395         ops[0].cls.class_name = class_name;
1396         ops[0].cls.class_len = (__u8) class_name_len;
1397         ops[0].cls.method_name = method_name;
1398         ops[0].cls.method_len = (__u8) method_name_len;
1399         ops[0].cls.argc = 0;
1400         ops[0].cls.indata = data;
1401         ops[0].cls.indata_len = len;
1402
1403         ret = rbd_req_sync_op(rbd_dev, NULL,
1404                                CEPH_NOSNAP,
1405                                0,
1406                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1407                                ops,
1408                                object_name, 0, 0, NULL, NULL, ver);
1409
1410         rbd_destroy_ops(ops);
1411
1412         dout("cls_exec returned %d\n", ret);
1413         return ret;
1414 }
1415
1416 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1417 {
1418         struct rbd_req_coll *coll =
1419                         kzalloc(sizeof(struct rbd_req_coll) +
1420                                 sizeof(struct rbd_req_status) * num_reqs,
1421                                 GFP_ATOMIC);
1422
1423         if (!coll)
1424                 return NULL;
1425         coll->total = num_reqs;
1426         kref_init(&coll->kref);
1427         return coll;
1428 }
1429
1430 /*
1431  * block device queue callback
1432  */
1433 static void rbd_rq_fn(struct request_queue *q)
1434 {
1435         struct rbd_device *rbd_dev = q->queuedata;
1436         struct request *rq;
1437         struct bio_pair *bp = NULL;
1438
1439         while ((rq = blk_fetch_request(q))) {
1440                 struct bio *bio;
1441                 struct bio *rq_bio, *next_bio = NULL;
1442                 bool do_write;
1443                 unsigned int size;
1444                 u64 op_size = 0;
1445                 u64 ofs;
1446                 int num_segs, cur_seg = 0;
1447                 struct rbd_req_coll *coll;
1448                 struct ceph_snap_context *snapc;
1449
1450                 /* peek at request from block layer */
1451                 if (!rq)
1452                         break;
1453
1454                 dout("fetched request\n");
1455
1456                 /* filter out block requests we don't understand */
1457                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1458                         __blk_end_request_all(rq, 0);
1459                         continue;
1460                 }
1461
1462                 /* deduce our operation (read, write) */
1463                 do_write = (rq_data_dir(rq) == WRITE);
1464
1465                 size = blk_rq_bytes(rq);
1466                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1467                 rq_bio = rq->bio;
1468                 if (do_write && rbd_dev->read_only) {
1469                         __blk_end_request_all(rq, -EROFS);
1470                         continue;
1471                 }
1472
1473                 spin_unlock_irq(q->queue_lock);
1474
1475                 down_read(&rbd_dev->header_rwsem);
1476
1477                 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1478                         up_read(&rbd_dev->header_rwsem);
1479                         dout("request for non-existent snapshot");
1480                         spin_lock_irq(q->queue_lock);
1481                         __blk_end_request_all(rq, -ENXIO);
1482                         continue;
1483                 }
1484
1485                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1486
1487                 up_read(&rbd_dev->header_rwsem);
1488
1489                 dout("%s 0x%x bytes at 0x%llx\n",
1490                      do_write ? "write" : "read",
1491                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1492
1493                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1494                 coll = rbd_alloc_coll(num_segs);
1495                 if (!coll) {
1496                         spin_lock_irq(q->queue_lock);
1497                         __blk_end_request_all(rq, -ENOMEM);
1498                         ceph_put_snap_context(snapc);
1499                         continue;
1500                 }
1501
1502                 do {
1503                         /* a bio clone to be passed down to OSD req */
1504                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1505                         op_size = rbd_get_segment(&rbd_dev->header,
1506                                                   rbd_dev->header.object_prefix,
1507                                                   ofs, size,
1508                                                   NULL, NULL);
1509                         kref_get(&coll->kref);
1510                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1511                                               op_size, GFP_ATOMIC);
1512                         if (!bio) {
1513                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1514                                                        -ENOMEM, op_size);
1515                                 goto next_seg;
1516                         }
1517
1518
1519                         /* init OSD command: write or read */
1520                         if (do_write)
1521                                 rbd_req_write(rq, rbd_dev,
1522                                               snapc,
1523                                               ofs,
1524                                               op_size, bio,
1525                                               coll, cur_seg);
1526                         else
1527                                 rbd_req_read(rq, rbd_dev,
1528                                              rbd_dev->snap_id,
1529                                              ofs,
1530                                              op_size, bio,
1531                                              coll, cur_seg);
1532
1533 next_seg:
1534                         size -= op_size;
1535                         ofs += op_size;
1536
1537                         cur_seg++;
1538                         rq_bio = next_bio;
1539                 } while (size > 0);
1540                 kref_put(&coll->kref, rbd_coll_release);
1541
1542                 if (bp)
1543                         bio_pair_release(bp);
1544                 spin_lock_irq(q->queue_lock);
1545
1546                 ceph_put_snap_context(snapc);
1547         }
1548 }
1549
1550 /*
1551  * a queue callback. Makes sure that we don't create a bio that spans across
1552  * multiple osd objects. One exception would be with a single page bios,
1553  * which we handle later at bio_chain_clone
1554  */
1555 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1556                           struct bio_vec *bvec)
1557 {
1558         struct rbd_device *rbd_dev = q->queuedata;
1559         unsigned int chunk_sectors;
1560         sector_t sector;
1561         unsigned int bio_sectors;
1562         int max;
1563
1564         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1565         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1566         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1567
1568         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1569                                  + bio_sectors)) << SECTOR_SHIFT;
1570         if (max < 0)
1571                 max = 0; /* bio_add cannot handle a negative return */
1572         if (max <= bvec->bv_len && bio_sectors == 0)
1573                 return bvec->bv_len;
1574         return max;
1575 }
1576
1577 static void rbd_free_disk(struct rbd_device *rbd_dev)
1578 {
1579         struct gendisk *disk = rbd_dev->disk;
1580
1581         if (!disk)
1582                 return;
1583
1584         rbd_header_free(&rbd_dev->header);
1585
1586         if (disk->flags & GENHD_FL_UP)
1587                 del_gendisk(disk);
1588         if (disk->queue)
1589                 blk_cleanup_queue(disk->queue);
1590         put_disk(disk);
1591 }
1592
1593 /*
1594  * reload the ondisk the header 
1595  */
1596 static int rbd_read_header(struct rbd_device *rbd_dev,
1597                            struct rbd_image_header *header)
1598 {
1599         ssize_t rc;
1600         struct rbd_image_header_ondisk *dh;
1601         u32 snap_count = 0;
1602         u64 ver;
1603         size_t len;
1604
1605         /*
1606          * First reads the fixed-size header to determine the number
1607          * of snapshots, then re-reads it, along with all snapshot
1608          * records as well as their stored names.
1609          */
1610         len = sizeof (*dh);
1611         while (1) {
1612                 dh = kmalloc(len, GFP_KERNEL);
1613                 if (!dh)
1614                         return -ENOMEM;
1615
1616                 rc = rbd_req_sync_read(rbd_dev,
1617                                        CEPH_NOSNAP,
1618                                        rbd_dev->header_name,
1619                                        0, len,
1620                                        (char *)dh, &ver);
1621                 if (rc < 0)
1622                         goto out_dh;
1623
1624                 rc = rbd_header_from_disk(header, dh, snap_count);
1625                 if (rc < 0) {
1626                         if (rc == -ENXIO)
1627                                 pr_warning("unrecognized header format"
1628                                            " for image %s\n",
1629                                            rbd_dev->image_name);
1630                         goto out_dh;
1631                 }
1632
1633                 if (snap_count == header->total_snaps)
1634                         break;
1635
1636                 snap_count = header->total_snaps;
1637                 len = sizeof (*dh) +
1638                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1639                         header->snap_names_len;
1640
1641                 rbd_header_free(header);
1642                 kfree(dh);
1643         }
1644         header->obj_version = ver;
1645
1646 out_dh:
1647         kfree(dh);
1648         return rc;
1649 }
1650
1651 /*
1652  * create a snapshot
1653  */
1654 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1655                                const char *snap_name,
1656                                gfp_t gfp_flags)
1657 {
1658         int name_len = strlen(snap_name);
1659         u64 new_snapid;
1660         int ret;
1661         void *data, *p, *e;
1662         u64 ver;
1663         struct ceph_mon_client *monc;
1664
1665         /* we should create a snapshot only if we're pointing at the head */
1666         if (rbd_dev->snap_id != CEPH_NOSNAP)
1667                 return -EINVAL;
1668
1669         monc = &rbd_dev->rbd_client->client->monc;
1670         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1671         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1672         if (ret < 0)
1673                 return ret;
1674
1675         data = kmalloc(name_len + 16, gfp_flags);
1676         if (!data)
1677                 return -ENOMEM;
1678
1679         p = data;
1680         e = data + name_len + 16;
1681
1682         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1683         ceph_encode_64_safe(&p, e, new_snapid, bad);
1684
1685         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1686                                 "rbd", "snap_add",
1687                                 data, p - data, &ver);
1688
1689         kfree(data);
1690
1691         return ret < 0 ? ret : 0;
1692 bad:
1693         return -ERANGE;
1694 }
1695
1696 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1697 {
1698         struct rbd_snap *snap;
1699         struct rbd_snap *next;
1700
1701         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1702                 __rbd_remove_snap_dev(snap);
1703 }
1704
1705 /*
1706  * only read the first part of the ondisk header, without the snaps info
1707  */
1708 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1709 {
1710         int ret;
1711         struct rbd_image_header h;
1712
1713         ret = rbd_read_header(rbd_dev, &h);
1714         if (ret < 0)
1715                 return ret;
1716
1717         down_write(&rbd_dev->header_rwsem);
1718
1719         /* resized? */
1720         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1721                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1722
1723                 dout("setting size to %llu sectors", (unsigned long long) size);
1724                 set_capacity(rbd_dev->disk, size);
1725         }
1726
1727         /* rbd_dev->header.object_prefix shouldn't change */
1728         kfree(rbd_dev->header.snap_sizes);
1729         kfree(rbd_dev->header.snap_names);
1730         /* osd requests may still refer to snapc */
1731         ceph_put_snap_context(rbd_dev->header.snapc);
1732
1733         rbd_dev->header.obj_version = h.obj_version;
1734         rbd_dev->header.image_size = h.image_size;
1735         rbd_dev->header.total_snaps = h.total_snaps;
1736         rbd_dev->header.snapc = h.snapc;
1737         rbd_dev->header.snap_names = h.snap_names;
1738         rbd_dev->header.snap_names_len = h.snap_names_len;
1739         rbd_dev->header.snap_sizes = h.snap_sizes;
1740         /* Free the extra copy of the object prefix */
1741         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1742         kfree(h.object_prefix);
1743
1744         ret = __rbd_init_snaps_header(rbd_dev);
1745
1746         up_write(&rbd_dev->header_rwsem);
1747
1748         return ret;
1749 }
1750
1751 static int rbd_init_disk(struct rbd_device *rbd_dev)
1752 {
1753         struct gendisk *disk;
1754         struct request_queue *q;
1755         int rc;
1756         u64 segment_size;
1757         u64 total_size = 0;
1758
1759         /* contact OSD, request size info about the object being mapped */
1760         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1761         if (rc)
1762                 return rc;
1763
1764         /* no need to lock here, as rbd_dev is not registered yet */
1765         rc = __rbd_init_snaps_header(rbd_dev);
1766         if (rc)
1767                 return rc;
1768
1769         rc = rbd_header_set_snap(rbd_dev, &total_size);
1770         if (rc)
1771                 return rc;
1772
1773         /* create gendisk info */
1774         rc = -ENOMEM;
1775         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1776         if (!disk)
1777                 goto out;
1778
1779         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1780                  rbd_dev->dev_id);
1781         disk->major = rbd_dev->major;
1782         disk->first_minor = 0;
1783         disk->fops = &rbd_bd_ops;
1784         disk->private_data = rbd_dev;
1785
1786         /* init rq */
1787         rc = -ENOMEM;
1788         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1789         if (!q)
1790                 goto out_disk;
1791
1792         /* We use the default size, but let's be explicit about it. */
1793         blk_queue_physical_block_size(q, SECTOR_SIZE);
1794
1795         /* set io sizes to object size */
1796         segment_size = rbd_obj_bytes(&rbd_dev->header);
1797         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1798         blk_queue_max_segment_size(q, segment_size);
1799         blk_queue_io_min(q, segment_size);
1800         blk_queue_io_opt(q, segment_size);
1801
1802         blk_queue_merge_bvec(q, rbd_merge_bvec);
1803         disk->queue = q;
1804
1805         q->queuedata = rbd_dev;
1806
1807         rbd_dev->disk = disk;
1808         rbd_dev->q = q;
1809
1810         /* finally, announce the disk to the world */
1811         set_capacity(disk, total_size / SECTOR_SIZE);
1812         add_disk(disk);
1813
1814         pr_info("%s: added with size 0x%llx\n",
1815                 disk->disk_name, (unsigned long long)total_size);
1816         return 0;
1817
1818 out_disk:
1819         put_disk(disk);
1820 out:
1821         return rc;
1822 }
1823
1824 /*
1825   sysfs
1826 */
1827
1828 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1829 {
1830         return container_of(dev, struct rbd_device, dev);
1831 }
1832
1833 static ssize_t rbd_size_show(struct device *dev,
1834                              struct device_attribute *attr, char *buf)
1835 {
1836         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1837         sector_t size;
1838
1839         down_read(&rbd_dev->header_rwsem);
1840         size = get_capacity(rbd_dev->disk);
1841         up_read(&rbd_dev->header_rwsem);
1842
1843         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1844 }
1845
1846 static ssize_t rbd_major_show(struct device *dev,
1847                               struct device_attribute *attr, char *buf)
1848 {
1849         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1850
1851         return sprintf(buf, "%d\n", rbd_dev->major);
1852 }
1853
1854 static ssize_t rbd_client_id_show(struct device *dev,
1855                                   struct device_attribute *attr, char *buf)
1856 {
1857         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1858
1859         return sprintf(buf, "client%lld\n",
1860                         ceph_client_id(rbd_dev->rbd_client->client));
1861 }
1862
1863 static ssize_t rbd_pool_show(struct device *dev,
1864                              struct device_attribute *attr, char *buf)
1865 {
1866         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1867
1868         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1869 }
1870
1871 static ssize_t rbd_pool_id_show(struct device *dev,
1872                              struct device_attribute *attr, char *buf)
1873 {
1874         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1875
1876         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1877 }
1878
1879 static ssize_t rbd_name_show(struct device *dev,
1880                              struct device_attribute *attr, char *buf)
1881 {
1882         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1883
1884         return sprintf(buf, "%s\n", rbd_dev->image_name);
1885 }
1886
1887 static ssize_t rbd_snap_show(struct device *dev,
1888                              struct device_attribute *attr,
1889                              char *buf)
1890 {
1891         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1892
1893         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1894 }
1895
1896 static ssize_t rbd_image_refresh(struct device *dev,
1897                                  struct device_attribute *attr,
1898                                  const char *buf,
1899                                  size_t size)
1900 {
1901         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1902         int rc;
1903         int ret = size;
1904
1905         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1906
1907         rc = __rbd_refresh_header(rbd_dev);
1908         if (rc < 0)
1909                 ret = rc;
1910
1911         mutex_unlock(&ctl_mutex);
1912         return ret;
1913 }
1914
1915 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1916 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1917 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1918 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1919 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1920 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1921 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1922 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1923 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1924
1925 static struct attribute *rbd_attrs[] = {
1926         &dev_attr_size.attr,
1927         &dev_attr_major.attr,
1928         &dev_attr_client_id.attr,
1929         &dev_attr_pool.attr,
1930         &dev_attr_pool_id.attr,
1931         &dev_attr_name.attr,
1932         &dev_attr_current_snap.attr,
1933         &dev_attr_refresh.attr,
1934         &dev_attr_create_snap.attr,
1935         NULL
1936 };
1937
1938 static struct attribute_group rbd_attr_group = {
1939         .attrs = rbd_attrs,
1940 };
1941
1942 static const struct attribute_group *rbd_attr_groups[] = {
1943         &rbd_attr_group,
1944         NULL
1945 };
1946
1947 static void rbd_sysfs_dev_release(struct device *dev)
1948 {
1949 }
1950
1951 static struct device_type rbd_device_type = {
1952         .name           = "rbd",
1953         .groups         = rbd_attr_groups,
1954         .release        = rbd_sysfs_dev_release,
1955 };
1956
1957
1958 /*
1959   sysfs - snapshots
1960 */
1961
1962 static ssize_t rbd_snap_size_show(struct device *dev,
1963                                   struct device_attribute *attr,
1964                                   char *buf)
1965 {
1966         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1967
1968         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1969 }
1970
1971 static ssize_t rbd_snap_id_show(struct device *dev,
1972                                 struct device_attribute *attr,
1973                                 char *buf)
1974 {
1975         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1976
1977         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1978 }
1979
1980 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1981 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1982
1983 static struct attribute *rbd_snap_attrs[] = {
1984         &dev_attr_snap_size.attr,
1985         &dev_attr_snap_id.attr,
1986         NULL,
1987 };
1988
1989 static struct attribute_group rbd_snap_attr_group = {
1990         .attrs = rbd_snap_attrs,
1991 };
1992
1993 static void rbd_snap_dev_release(struct device *dev)
1994 {
1995         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1996         kfree(snap->name);
1997         kfree(snap);
1998 }
1999
2000 static const struct attribute_group *rbd_snap_attr_groups[] = {
2001         &rbd_snap_attr_group,
2002         NULL
2003 };
2004
2005 static struct device_type rbd_snap_device_type = {
2006         .groups         = rbd_snap_attr_groups,
2007         .release        = rbd_snap_dev_release,
2008 };
2009
2010 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2011 {
2012         list_del(&snap->node);
2013         device_unregister(&snap->dev);
2014 }
2015
2016 static int rbd_register_snap_dev(struct rbd_snap *snap,
2017                                   struct device *parent)
2018 {
2019         struct device *dev = &snap->dev;
2020         int ret;
2021
2022         dev->type = &rbd_snap_device_type;
2023         dev->parent = parent;
2024         dev->release = rbd_snap_dev_release;
2025         dev_set_name(dev, "snap_%s", snap->name);
2026         ret = device_register(dev);
2027
2028         return ret;
2029 }
2030
2031 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2032                               int i, const char *name,
2033                               struct rbd_snap **snapp)
2034 {
2035         int ret;
2036         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2037         if (!snap)
2038                 return -ENOMEM;
2039         snap->name = kstrdup(name, GFP_KERNEL);
2040         snap->size = rbd_dev->header.snap_sizes[i];
2041         snap->id = rbd_dev->header.snapc->snaps[i];
2042         if (device_is_registered(&rbd_dev->dev)) {
2043                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2044                 if (ret < 0)
2045                         goto err;
2046         }
2047         *snapp = snap;
2048         return 0;
2049 err:
2050         kfree(snap->name);
2051         kfree(snap);
2052         return ret;
2053 }
2054
2055 /*
2056  * search for the previous snap in a null delimited string list
2057  */
2058 const char *rbd_prev_snap_name(const char *name, const char *start)
2059 {
2060         if (name < start + 2)
2061                 return NULL;
2062
2063         name -= 2;
2064         while (*name) {
2065                 if (name == start)
2066                         return start;
2067                 name--;
2068         }
2069         return name + 1;
2070 }
2071
2072 /*
2073  * compare the old list of snapshots that we have to what's in the header
2074  * and update it accordingly. Note that the header holds the snapshots
2075  * in a reverse order (from newest to oldest) and we need to go from
2076  * older to new so that we don't get a duplicate snap name when
2077  * doing the process (e.g., removed snapshot and recreated a new
2078  * one with the same name.
2079  */
2080 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2081 {
2082         const char *name, *first_name;
2083         int i = rbd_dev->header.total_snaps;
2084         struct rbd_snap *snap, *old_snap = NULL;
2085         int ret;
2086         struct list_head *p, *n;
2087
2088         first_name = rbd_dev->header.snap_names;
2089         name = first_name + rbd_dev->header.snap_names_len;
2090
2091         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2092                 u64 cur_id;
2093
2094                 old_snap = list_entry(p, struct rbd_snap, node);
2095
2096                 if (i)
2097                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2098
2099                 if (!i || old_snap->id < cur_id) {
2100                         /*
2101                          * old_snap->id was skipped, thus was
2102                          * removed.  If this rbd_dev is mapped to
2103                          * the removed snapshot, record that it no
2104                          * longer exists, to prevent further I/O.
2105                          */
2106                         if (rbd_dev->snap_id == old_snap->id)
2107                                 rbd_dev->snap_exists = false;
2108                         __rbd_remove_snap_dev(old_snap);
2109                         continue;
2110                 }
2111                 if (old_snap->id == cur_id) {
2112                         /* we have this snapshot already */
2113                         i--;
2114                         name = rbd_prev_snap_name(name, first_name);
2115                         continue;
2116                 }
2117                 for (; i > 0;
2118                      i--, name = rbd_prev_snap_name(name, first_name)) {
2119                         if (!name) {
2120                                 WARN_ON(1);
2121                                 return -EINVAL;
2122                         }
2123                         cur_id = rbd_dev->header.snapc->snaps[i];
2124                         /* snapshot removal? handle it above */
2125                         if (cur_id >= old_snap->id)
2126                                 break;
2127                         /* a new snapshot */
2128                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2129                         if (ret < 0)
2130                                 return ret;
2131
2132                         /* note that we add it backward so using n and not p */
2133                         list_add(&snap->node, n);
2134                         p = &snap->node;
2135                 }
2136         }
2137         /* we're done going over the old snap list, just add what's left */
2138         for (; i > 0; i--) {
2139                 name = rbd_prev_snap_name(name, first_name);
2140                 if (!name) {
2141                         WARN_ON(1);
2142                         return -EINVAL;
2143                 }
2144                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2145                 if (ret < 0)
2146                         return ret;
2147                 list_add(&snap->node, &rbd_dev->snaps);
2148         }
2149
2150         return 0;
2151 }
2152
2153 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2154 {
2155         int ret;
2156         struct device *dev;
2157         struct rbd_snap *snap;
2158
2159         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2160         dev = &rbd_dev->dev;
2161
2162         dev->bus = &rbd_bus_type;
2163         dev->type = &rbd_device_type;
2164         dev->parent = &rbd_root_dev;
2165         dev->release = rbd_dev_release;
2166         dev_set_name(dev, "%d", rbd_dev->dev_id);
2167         ret = device_register(dev);
2168         if (ret < 0)
2169                 goto out;
2170
2171         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2172                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2173                 if (ret < 0)
2174                         break;
2175         }
2176 out:
2177         mutex_unlock(&ctl_mutex);
2178         return ret;
2179 }
2180
2181 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2182 {
2183         device_unregister(&rbd_dev->dev);
2184 }
2185
2186 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2187 {
2188         int ret, rc;
2189
2190         do {
2191                 ret = rbd_req_sync_watch(rbd_dev);
2192                 if (ret == -ERANGE) {
2193                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2194                         rc = __rbd_refresh_header(rbd_dev);
2195                         mutex_unlock(&ctl_mutex);
2196                         if (rc < 0)
2197                                 return rc;
2198                 }
2199         } while (ret == -ERANGE);
2200
2201         return ret;
2202 }
2203
2204 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2205
2206 /*
2207  * Get a unique rbd identifier for the given new rbd_dev, and add
2208  * the rbd_dev to the global list.  The minimum rbd id is 1.
2209  */
2210 static void rbd_id_get(struct rbd_device *rbd_dev)
2211 {
2212         rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2213
2214         spin_lock(&rbd_dev_list_lock);
2215         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2216         spin_unlock(&rbd_dev_list_lock);
2217 }
2218
2219 /*
2220  * Remove an rbd_dev from the global list, and record that its
2221  * identifier is no longer in use.
2222  */
2223 static void rbd_id_put(struct rbd_device *rbd_dev)
2224 {
2225         struct list_head *tmp;
2226         int rbd_id = rbd_dev->dev_id;
2227         int max_id;
2228
2229         BUG_ON(rbd_id < 1);
2230
2231         spin_lock(&rbd_dev_list_lock);
2232         list_del_init(&rbd_dev->node);
2233
2234         /*
2235          * If the id being "put" is not the current maximum, there
2236          * is nothing special we need to do.
2237          */
2238         if (rbd_id != atomic64_read(&rbd_id_max)) {
2239                 spin_unlock(&rbd_dev_list_lock);
2240                 return;
2241         }
2242
2243         /*
2244          * We need to update the current maximum id.  Search the
2245          * list to find out what it is.  We're more likely to find
2246          * the maximum at the end, so search the list backward.
2247          */
2248         max_id = 0;
2249         list_for_each_prev(tmp, &rbd_dev_list) {
2250                 struct rbd_device *rbd_dev;
2251
2252                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2253                 if (rbd_id > max_id)
2254                         max_id = rbd_id;
2255         }
2256         spin_unlock(&rbd_dev_list_lock);
2257
2258         /*
2259          * The max id could have been updated by rbd_id_get(), in
2260          * which case it now accurately reflects the new maximum.
2261          * Be careful not to overwrite the maximum value in that
2262          * case.
2263          */
2264         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2265 }
2266
2267 /*
2268  * Skips over white space at *buf, and updates *buf to point to the
2269  * first found non-space character (if any). Returns the length of
2270  * the token (string of non-white space characters) found.  Note
2271  * that *buf must be terminated with '\0'.
2272  */
2273 static inline size_t next_token(const char **buf)
2274 {
2275         /*
2276         * These are the characters that produce nonzero for
2277         * isspace() in the "C" and "POSIX" locales.
2278         */
2279         const char *spaces = " \f\n\r\t\v";
2280
2281         *buf += strspn(*buf, spaces);   /* Find start of token */
2282
2283         return strcspn(*buf, spaces);   /* Return token length */
2284 }
2285
2286 /*
2287  * Finds the next token in *buf, and if the provided token buffer is
2288  * big enough, copies the found token into it.  The result, if
2289  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2290  * must be terminated with '\0' on entry.
2291  *
2292  * Returns the length of the token found (not including the '\0').
2293  * Return value will be 0 if no token is found, and it will be >=
2294  * token_size if the token would not fit.
2295  *
2296  * The *buf pointer will be updated to point beyond the end of the
2297  * found token.  Note that this occurs even if the token buffer is
2298  * too small to hold it.
2299  */
2300 static inline size_t copy_token(const char **buf,
2301                                 char *token,
2302                                 size_t token_size)
2303 {
2304         size_t len;
2305
2306         len = next_token(buf);
2307         if (len < token_size) {
2308                 memcpy(token, *buf, len);
2309                 *(token + len) = '\0';
2310         }
2311         *buf += len;
2312
2313         return len;
2314 }
2315
2316 /*
2317  * Finds the next token in *buf, dynamically allocates a buffer big
2318  * enough to hold a copy of it, and copies the token into the new
2319  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2320  * that a duplicate buffer is created even for a zero-length token.
2321  *
2322  * Returns a pointer to the newly-allocated duplicate, or a null
2323  * pointer if memory for the duplicate was not available.  If
2324  * the lenp argument is a non-null pointer, the length of the token
2325  * (not including the '\0') is returned in *lenp.
2326  *
2327  * If successful, the *buf pointer will be updated to point beyond
2328  * the end of the found token.
2329  *
2330  * Note: uses GFP_KERNEL for allocation.
2331  */
2332 static inline char *dup_token(const char **buf, size_t *lenp)
2333 {
2334         char *dup;
2335         size_t len;
2336
2337         len = next_token(buf);
2338         dup = kmalloc(len + 1, GFP_KERNEL);
2339         if (!dup)
2340                 return NULL;
2341
2342         memcpy(dup, *buf, len);
2343         *(dup + len) = '\0';
2344         *buf += len;
2345
2346         if (lenp)
2347                 *lenp = len;
2348
2349         return dup;
2350 }
2351
2352 /*
2353  * This fills in the pool_name, image_name, image_name_len, snap_name,
2354  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2355  * on the list of monitor addresses and other options provided via
2356  * /sys/bus/rbd/add.
2357  *
2358  * Note: rbd_dev is assumed to have been initially zero-filled.
2359  */
2360 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2361                               const char *buf,
2362                               const char **mon_addrs,
2363                               size_t *mon_addrs_size,
2364                               char *options,
2365                              size_t options_size)
2366 {
2367         size_t len;
2368         int ret;
2369
2370         /* The first four tokens are required */
2371
2372         len = next_token(&buf);
2373         if (!len)
2374                 return -EINVAL;
2375         *mon_addrs_size = len + 1;
2376         *mon_addrs = buf;
2377
2378         buf += len;
2379
2380         len = copy_token(&buf, options, options_size);
2381         if (!len || len >= options_size)
2382                 return -EINVAL;
2383
2384         ret = -ENOMEM;
2385         rbd_dev->pool_name = dup_token(&buf, NULL);
2386         if (!rbd_dev->pool_name)
2387                 goto out_err;
2388
2389         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2390         if (!rbd_dev->image_name)
2391                 goto out_err;
2392
2393         /* Create the name of the header object */
2394
2395         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2396                                                 + sizeof (RBD_SUFFIX),
2397                                         GFP_KERNEL);
2398         if (!rbd_dev->header_name)
2399                 goto out_err;
2400         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2401
2402         /*
2403          * The snapshot name is optional.  If none is is supplied,
2404          * we use the default value.
2405          */
2406         rbd_dev->snap_name = dup_token(&buf, &len);
2407         if (!rbd_dev->snap_name)
2408                 goto out_err;
2409         if (!len) {
2410                 /* Replace the empty name with the default */
2411                 kfree(rbd_dev->snap_name);
2412                 rbd_dev->snap_name
2413                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2414                 if (!rbd_dev->snap_name)
2415                         goto out_err;
2416
2417                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2418                         sizeof (RBD_SNAP_HEAD_NAME));
2419         }
2420
2421         return 0;
2422
2423 out_err:
2424         kfree(rbd_dev->header_name);
2425         kfree(rbd_dev->image_name);
2426         kfree(rbd_dev->pool_name);
2427         rbd_dev->pool_name = NULL;
2428
2429         return ret;
2430 }
2431
2432 static ssize_t rbd_add(struct bus_type *bus,
2433                        const char *buf,
2434                        size_t count)
2435 {
2436         char *options;
2437         struct rbd_device *rbd_dev = NULL;
2438         const char *mon_addrs = NULL;
2439         size_t mon_addrs_size = 0;
2440         struct ceph_osd_client *osdc;
2441         int rc = -ENOMEM;
2442
2443         if (!try_module_get(THIS_MODULE))
2444                 return -ENODEV;
2445
2446         options = kmalloc(count, GFP_KERNEL);
2447         if (!options)
2448                 goto err_nomem;
2449         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2450         if (!rbd_dev)
2451                 goto err_nomem;
2452
2453         /* static rbd_device initialization */
2454         spin_lock_init(&rbd_dev->lock);
2455         INIT_LIST_HEAD(&rbd_dev->node);
2456         INIT_LIST_HEAD(&rbd_dev->snaps);
2457         init_rwsem(&rbd_dev->header_rwsem);
2458
2459         /* generate unique id: find highest unique id, add one */
2460         rbd_id_get(rbd_dev);
2461
2462         /* Fill in the device name, now that we have its id. */
2463         BUILD_BUG_ON(DEV_NAME_LEN
2464                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2465         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2466
2467         /* parse add command */
2468         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2469                                 options, count);
2470         if (rc)
2471                 goto err_put_id;
2472
2473         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2474                                                 options);
2475         if (IS_ERR(rbd_dev->rbd_client)) {
2476                 rc = PTR_ERR(rbd_dev->rbd_client);
2477                 goto err_put_id;
2478         }
2479
2480         /* pick the pool */
2481         osdc = &rbd_dev->rbd_client->client->osdc;
2482         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2483         if (rc < 0)
2484                 goto err_out_client;
2485         rbd_dev->pool_id = rc;
2486
2487         /* register our block device */
2488         rc = register_blkdev(0, rbd_dev->name);
2489         if (rc < 0)
2490                 goto err_out_client;
2491         rbd_dev->major = rc;
2492
2493         rc = rbd_bus_add_dev(rbd_dev);
2494         if (rc)
2495                 goto err_out_blkdev;
2496
2497         /*
2498          * At this point cleanup in the event of an error is the job
2499          * of the sysfs code (initiated by rbd_bus_del_dev()).
2500          *
2501          * Set up and announce blkdev mapping.
2502          */
2503         rc = rbd_init_disk(rbd_dev);
2504         if (rc)
2505                 goto err_out_bus;
2506
2507         rc = rbd_init_watch_dev(rbd_dev);
2508         if (rc)
2509                 goto err_out_bus;
2510
2511         return count;
2512
2513 err_out_bus:
2514         /* this will also clean up rest of rbd_dev stuff */
2515
2516         rbd_bus_del_dev(rbd_dev);
2517         kfree(options);
2518         return rc;
2519
2520 err_out_blkdev:
2521         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2522 err_out_client:
2523         rbd_put_client(rbd_dev);
2524 err_put_id:
2525         if (rbd_dev->pool_name) {
2526                 kfree(rbd_dev->snap_name);
2527                 kfree(rbd_dev->header_name);
2528                 kfree(rbd_dev->image_name);
2529                 kfree(rbd_dev->pool_name);
2530         }
2531         rbd_id_put(rbd_dev);
2532 err_nomem:
2533         kfree(rbd_dev);
2534         kfree(options);
2535
2536         dout("Error adding device %s\n", buf);
2537         module_put(THIS_MODULE);
2538
2539         return (ssize_t) rc;
2540 }
2541
2542 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2543 {
2544         struct list_head *tmp;
2545         struct rbd_device *rbd_dev;
2546
2547         spin_lock(&rbd_dev_list_lock);
2548         list_for_each(tmp, &rbd_dev_list) {
2549                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2550                 if (rbd_dev->dev_id == dev_id) {
2551                         spin_unlock(&rbd_dev_list_lock);
2552                         return rbd_dev;
2553                 }
2554         }
2555         spin_unlock(&rbd_dev_list_lock);
2556         return NULL;
2557 }
2558
2559 static void rbd_dev_release(struct device *dev)
2560 {
2561         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2562
2563         if (rbd_dev->watch_request) {
2564                 struct ceph_client *client = rbd_dev->rbd_client->client;
2565
2566                 ceph_osdc_unregister_linger_request(&client->osdc,
2567                                                     rbd_dev->watch_request);
2568         }
2569         if (rbd_dev->watch_event)
2570                 rbd_req_sync_unwatch(rbd_dev, rbd_dev->header_name);
2571
2572         rbd_put_client(rbd_dev);
2573
2574         /* clean up and free blkdev */
2575         rbd_free_disk(rbd_dev);
2576         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2577
2578         /* done with the id, and with the rbd_dev */
2579         kfree(rbd_dev->snap_name);
2580         kfree(rbd_dev->header_name);
2581         kfree(rbd_dev->pool_name);
2582         kfree(rbd_dev->image_name);
2583         rbd_id_put(rbd_dev);
2584         kfree(rbd_dev);
2585
2586         /* release module ref */
2587         module_put(THIS_MODULE);
2588 }
2589
2590 static ssize_t rbd_remove(struct bus_type *bus,
2591                           const char *buf,
2592                           size_t count)
2593 {
2594         struct rbd_device *rbd_dev = NULL;
2595         int target_id, rc;
2596         unsigned long ul;
2597         int ret = count;
2598
2599         rc = strict_strtoul(buf, 10, &ul);
2600         if (rc)
2601                 return rc;
2602
2603         /* convert to int; abort if we lost anything in the conversion */
2604         target_id = (int) ul;
2605         if (target_id != ul)
2606                 return -EINVAL;
2607
2608         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2609
2610         rbd_dev = __rbd_get_dev(target_id);
2611         if (!rbd_dev) {
2612                 ret = -ENOENT;
2613                 goto done;
2614         }
2615
2616         __rbd_remove_all_snaps(rbd_dev);
2617         rbd_bus_del_dev(rbd_dev);
2618
2619 done:
2620         mutex_unlock(&ctl_mutex);
2621         return ret;
2622 }
2623
2624 static ssize_t rbd_snap_add(struct device *dev,
2625                             struct device_attribute *attr,
2626                             const char *buf,
2627                             size_t count)
2628 {
2629         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2630         int ret;
2631         char *name = kmalloc(count + 1, GFP_KERNEL);
2632         if (!name)
2633                 return -ENOMEM;
2634
2635         snprintf(name, count, "%s", buf);
2636
2637         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2638
2639         ret = rbd_header_add_snap(rbd_dev,
2640                                   name, GFP_KERNEL);
2641         if (ret < 0)
2642                 goto err_unlock;
2643
2644         ret = __rbd_refresh_header(rbd_dev);
2645         if (ret < 0)
2646                 goto err_unlock;
2647
2648         /* shouldn't hold ctl_mutex when notifying.. notify might
2649            trigger a watch callback that would need to get that mutex */
2650         mutex_unlock(&ctl_mutex);
2651
2652         /* make a best effort, don't error if failed */
2653         rbd_req_sync_notify(rbd_dev);
2654
2655         ret = count;
2656         kfree(name);
2657         return ret;
2658
2659 err_unlock:
2660         mutex_unlock(&ctl_mutex);
2661         kfree(name);
2662         return ret;
2663 }
2664
2665 /*
2666  * create control files in sysfs
2667  * /sys/bus/rbd/...
2668  */
2669 static int rbd_sysfs_init(void)
2670 {
2671         int ret;
2672
2673         ret = device_register(&rbd_root_dev);
2674         if (ret < 0)
2675                 return ret;
2676
2677         ret = bus_register(&rbd_bus_type);
2678         if (ret < 0)
2679                 device_unregister(&rbd_root_dev);
2680
2681         return ret;
2682 }
2683
2684 static void rbd_sysfs_cleanup(void)
2685 {
2686         bus_unregister(&rbd_bus_type);
2687         device_unregister(&rbd_root_dev);
2688 }
2689
2690 int __init rbd_init(void)
2691 {
2692         int rc;
2693
2694         rc = rbd_sysfs_init();
2695         if (rc)
2696                 return rc;
2697         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2698         return 0;
2699 }
2700
2701 void __exit rbd_exit(void)
2702 {
2703         rbd_sysfs_cleanup();
2704 }
2705
2706 module_init(rbd_init);
2707 module_exit(rbd_exit);
2708
2709 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2710 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2711 MODULE_DESCRIPTION("rados block device");
2712
2713 /* following authorship retained from original osdblk.c */
2714 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2715
2716 MODULE_LICENSE("GPL");