+
/*
rbd.c -- Export ceph rados objects as a Linux block device
#include <linux/ceph/mon_client.h>
#include <linux/ceph/decode.h>
#include <linux/parser.h>
+#include <linux/bsearch.h>
#include <linux/kernel.h>
#include <linux/device.h>
#include <linux/module.h>
#include <linux/fs.h>
#include <linux/blkdev.h>
+#include <linux/slab.h>
#include "rbd_types.h"
#define SECTOR_SHIFT 9
#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
+/*
+ * Increment the given counter and return its updated value.
+ * If the counter is already 0 it will not be incremented.
+ * If the counter is already at its maximum value returns
+ * -EINVAL without updating it.
+ */
+static int atomic_inc_return_safe(atomic_t *v)
+{
+ unsigned int counter;
+
+ counter = (unsigned int)__atomic_add_unless(v, 1, 0);
+ if (counter <= (unsigned int)INT_MAX)
+ return (int)counter;
+
+ atomic_dec(v);
+
+ return -EINVAL;
+}
+
+/* Decrement the counter. Return the resulting value, or -EINVAL */
+static int atomic_dec_return_safe(atomic_t *v)
+{
+ int counter;
+
+ counter = atomic_dec_return(v);
+ if (counter >= 0)
+ return counter;
+
+ atomic_inc(v);
+
+ return -EINVAL;
+}
+
#define RBD_DRV_NAME "rbd"
#define RBD_DRV_NAME_LONG "rbd (rados block device)"
#define RBD_SNAP_HEAD_NAME "-"
+#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
+
/* This allows a single page to hold an image name sent by OSD */
#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
#define RBD_IMAGE_ID_LEN_MAX 64
* block device image metadata (in-memory version)
*/
struct rbd_image_header {
- /* These four fields never change for a given rbd image */
+ /* These six fields never change for a given rbd image */
char *object_prefix;
- u64 features;
__u8 obj_order;
__u8 crypt_type;
__u8 comp_type;
+ u64 stripe_unit;
+ u64 stripe_count;
+ u64 features; /* Might be changeable someday? */
/* The remaining fields need to be updated occasionally */
u64 image_size;
struct ceph_snap_context *snapc;
- char *snap_names;
- u64 *snap_sizes;
-
- u64 stripe_unit;
- u64 stripe_count;
+ char *snap_names; /* format 1 only */
+ u64 *snap_sizes; /* format 1 only */
};
/*
};
};
struct page **copyup_pages;
+ u32 copyup_page_count;
struct ceph_osd_request *osd_req;
u64 xferred; /* bytes transferred */
- u64 version;
int result;
rbd_obj_callback_t callback;
struct rbd_obj_request *obj_request; /* obj req initiator */
};
struct page **copyup_pages;
+ u32 copyup_page_count;
spinlock_t completion_lock;/* protects next_completion */
u32 next_completion;
rbd_img_callback_t callback;
#define for_each_obj_request_safe(ireq, oreq, n) \
list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
-struct rbd_snap {
- const char *name;
- u64 size;
- struct list_head node;
- u64 id;
- u64 features;
-};
-
struct rbd_mapping {
u64 size;
u64 features;
struct rbd_spec *parent_spec;
u64 parent_overlap;
+ atomic_t parent_ref;
struct rbd_device *parent;
/* protects updating the header */
struct list_head node;
- /* list of snapshots */
- struct list_head snaps;
-
/* sysfs related */
struct device dev;
unsigned long open_count; /* protected by lock */
static LIST_HEAD(rbd_client_list); /* clients */
static DEFINE_SPINLOCK(rbd_client_list_lock);
-static int rbd_img_request_submit(struct rbd_img_request *img_request);
+/* Slab caches for frequently-allocated structures */
-static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
+static struct kmem_cache *rbd_img_request_cache;
+static struct kmem_cache *rbd_obj_request_cache;
+static struct kmem_cache *rbd_segment_name_cache;
+
+static int rbd_img_request_submit(struct rbd_img_request *img_request);
static void rbd_dev_device_release(struct device *dev);
-static void rbd_snap_destroy(struct rbd_snap *snap);
static ssize_t rbd_add(struct bus_type *bus, const char *buf,
size_t count);
static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
size_t count);
-static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
+static void rbd_spec_put(struct rbd_spec *spec);
static struct bus_attribute rbd_bus_attrs[] = {
__ATTR(add, S_IWUSR, NULL, rbd_add),
static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
-static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
-static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
+static int rbd_dev_refresh(struct rbd_device *rbd_dev);
+static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
+static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
+static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
+ u64 snap_id);
+static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
+ u8 *order, u64 *snap_size);
+static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
+ u64 *snap_features);
+static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
static int rbd_open(struct block_device *bdev, fmode_t mode)
{
}
/*
- * Create a new header structure, translate header format from the on-disk
- * header.
+ * Fill an rbd image header with information from the given format 1
+ * on-disk header.
*/
-static int rbd_header_from_disk(struct rbd_image_header *header,
+static int rbd_header_from_disk(struct rbd_device *rbd_dev,
struct rbd_image_header_ondisk *ondisk)
{
+ struct rbd_image_header *header = &rbd_dev->header;
+ bool first_time = header->object_prefix == NULL;
+ struct ceph_snap_context *snapc;
+ char *object_prefix = NULL;
+ char *snap_names = NULL;
+ u64 *snap_sizes = NULL;
u32 snap_count;
- size_t len;
size_t size;
+ int ret = -ENOMEM;
u32 i;
- memset(header, 0, sizeof (*header));
+ /* Allocate this now to avoid having to handle failure below */
- snap_count = le32_to_cpu(ondisk->snap_count);
+ if (first_time) {
+ size_t len;
- len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
- header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
- if (!header->object_prefix)
- return -ENOMEM;
- memcpy(header->object_prefix, ondisk->object_prefix, len);
- header->object_prefix[len] = '\0';
+ len = strnlen(ondisk->object_prefix,
+ sizeof (ondisk->object_prefix));
+ object_prefix = kmalloc(len + 1, GFP_KERNEL);
+ if (!object_prefix)
+ return -ENOMEM;
+ memcpy(object_prefix, ondisk->object_prefix, len);
+ object_prefix[len] = '\0';
+ }
+
+ /* Allocate the snapshot context and fill it in */
+ snap_count = le32_to_cpu(ondisk->snap_count);
+ snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
+ if (!snapc)
+ goto out_err;
+ snapc->seq = le64_to_cpu(ondisk->snap_seq);
if (snap_count) {
+ struct rbd_image_snap_ondisk *snaps;
u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
- /* Save a copy of the snapshot names */
+ /* We'll keep a copy of the snapshot names... */
+
+ if (snap_names_len > (u64)SIZE_MAX)
+ goto out_2big;
+ snap_names = kmalloc(snap_names_len, GFP_KERNEL);
+ if (!snap_names)
+ goto out_err;
+
+ /* ...as well as the array of their sizes. */
- if (snap_names_len > (u64) SIZE_MAX)
- return -EIO;
- header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
- if (!header->snap_names)
+ size = snap_count * sizeof (*header->snap_sizes);
+ snap_sizes = kmalloc(size, GFP_KERNEL);
+ if (!snap_sizes)
goto out_err;
+
/*
- * Note that rbd_dev_v1_header_read() guarantees
- * the ondisk buffer we're working with has
+ * Copy the names, and fill in each snapshot's id
+ * and size.
+ *
+ * Note that rbd_dev_v1_header_info() guarantees the
+ * ondisk buffer we're working with has
* snap_names_len bytes beyond the end of the
* snapshot id array, this memcpy() is safe.
*/
- memcpy(header->snap_names, &ondisk->snaps[snap_count],
- snap_names_len);
+ memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
+ snaps = ondisk->snaps;
+ for (i = 0; i < snap_count; i++) {
+ snapc->snaps[i] = le64_to_cpu(snaps[i].id);
+ snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
+ }
+ }
- /* Record each snapshot's size */
+ /* We won't fail any more, fill in the header */
- size = snap_count * sizeof (*header->snap_sizes);
- header->snap_sizes = kmalloc(size, GFP_KERNEL);
- if (!header->snap_sizes)
- goto out_err;
- for (i = 0; i < snap_count; i++)
- header->snap_sizes[i] =
- le64_to_cpu(ondisk->snaps[i].image_size);
+ down_write(&rbd_dev->header_rwsem);
+ if (first_time) {
+ header->object_prefix = object_prefix;
+ header->obj_order = ondisk->options.order;
+ header->crypt_type = ondisk->options.crypt_type;
+ header->comp_type = ondisk->options.comp_type;
+ /* The rest aren't used for format 1 images */
+ header->stripe_unit = 0;
+ header->stripe_count = 0;
+ header->features = 0;
} else {
- header->snap_names = NULL;
- header->snap_sizes = NULL;
+ ceph_put_snap_context(header->snapc);
+ kfree(header->snap_names);
+ kfree(header->snap_sizes);
}
- header->features = 0; /* No features support in v1 images */
- header->obj_order = ondisk->options.order;
- header->crypt_type = ondisk->options.crypt_type;
- header->comp_type = ondisk->options.comp_type;
-
- /* Allocate and fill in the snapshot context */
+ /* The remaining fields always get updated (when we refresh) */
header->image_size = le64_to_cpu(ondisk->image_size);
+ header->snapc = snapc;
+ header->snap_names = snap_names;
+ header->snap_sizes = snap_sizes;
- header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
- if (!header->snapc)
- goto out_err;
- header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
- for (i = 0; i < snap_count; i++)
- header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
+ /* Make sure mapping size is consistent with header info */
- return 0;
+ if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
+ if (rbd_dev->mapping.size != header->image_size)
+ rbd_dev->mapping.size = header->image_size;
+
+ up_write(&rbd_dev->header_rwsem);
+ return 0;
+out_2big:
+ ret = -EIO;
out_err:
- kfree(header->snap_sizes);
- header->snap_sizes = NULL;
- kfree(header->snap_names);
- header->snap_names = NULL;
- kfree(header->object_prefix);
- header->object_prefix = NULL;
+ kfree(snap_sizes);
+ kfree(snap_names);
+ ceph_put_snap_context(snapc);
+ kfree(object_prefix);
- return -ENOMEM;
+ return ret;
}
-static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
+static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
+{
+ const char *snap_name;
+
+ rbd_assert(which < rbd_dev->header.snapc->num_snaps);
+
+ /* Skip over names until we find the one we are looking for */
+
+ snap_name = rbd_dev->header.snap_names;
+ while (which--)
+ snap_name += strlen(snap_name) + 1;
+
+ return kstrdup(snap_name, GFP_KERNEL);
+}
+
+/*
+ * Snapshot id comparison function for use with qsort()/bsearch().
+ * Note that result is for snapshots in *descending* order.
+ */
+static int snapid_compare_reverse(const void *s1, const void *s2)
+{
+ u64 snap_id1 = *(u64 *)s1;
+ u64 snap_id2 = *(u64 *)s2;
+
+ if (snap_id1 < snap_id2)
+ return 1;
+ return snap_id1 == snap_id2 ? 0 : -1;
+}
+
+/*
+ * Search a snapshot context to see if the given snapshot id is
+ * present.
+ *
+ * Returns the position of the snapshot id in the array if it's found,
+ * or BAD_SNAP_INDEX otherwise.
+ *
+ * Note: The snapshot array is in kept sorted (by the osd) in
+ * reverse order, highest snapshot id first.
+ */
+static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
+{
+ struct ceph_snap_context *snapc = rbd_dev->header.snapc;
+ u64 *found;
+
+ found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
+ sizeof (snap_id), snapid_compare_reverse);
+
+ return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
+}
+
+static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
+ u64 snap_id)
{
- struct rbd_snap *snap;
+ u32 which;
+
+ which = rbd_dev_snap_index(rbd_dev, snap_id);
+ if (which == BAD_SNAP_INDEX)
+ return NULL;
+
+ return _rbd_dev_v1_snap_name(rbd_dev, which);
+}
+static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
+{
if (snap_id == CEPH_NOSNAP)
return RBD_SNAP_HEAD_NAME;
- list_for_each_entry(snap, &rbd_dev->snaps, node)
- if (snap_id == snap->id)
- return snap->name;
+ rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+ if (rbd_dev->image_format == 1)
+ return rbd_dev_v1_snap_name(rbd_dev, snap_id);
- return NULL;
+ return rbd_dev_v2_snap_name(rbd_dev, snap_id);
}
-static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
- const char *snap_name)
+static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
+ u64 *snap_size)
{
- struct rbd_snap *snap;
+ rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+ if (snap_id == CEPH_NOSNAP) {
+ *snap_size = rbd_dev->header.image_size;
+ } else if (rbd_dev->image_format == 1) {
+ u32 which;
+
+ which = rbd_dev_snap_index(rbd_dev, snap_id);
+ if (which == BAD_SNAP_INDEX)
+ return -ENOENT;
- list_for_each_entry(snap, &rbd_dev->snaps, node)
- if (!strcmp(snap_name, snap->name))
- return snap;
+ *snap_size = rbd_dev->header.snap_sizes[which];
+ } else {
+ u64 size = 0;
+ int ret;
- return NULL;
+ ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
+ if (ret)
+ return ret;
+
+ *snap_size = size;
+ }
+ return 0;
}
-static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
+static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
+ u64 *snap_features)
{
- if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
- sizeof (RBD_SNAP_HEAD_NAME))) {
- rbd_dev->mapping.size = rbd_dev->header.image_size;
- rbd_dev->mapping.features = rbd_dev->header.features;
+ rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+ if (snap_id == CEPH_NOSNAP) {
+ *snap_features = rbd_dev->header.features;
+ } else if (rbd_dev->image_format == 1) {
+ *snap_features = 0; /* No features for format 1 */
} else {
- struct rbd_snap *snap;
+ u64 features = 0;
+ int ret;
- snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
- if (!snap)
- return -ENOENT;
- rbd_dev->mapping.size = snap->size;
- rbd_dev->mapping.features = snap->features;
- rbd_dev->mapping.read_only = true;
- }
+ ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
+ if (ret)
+ return ret;
+ *snap_features = features;
+ }
return 0;
}
-static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
+static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
{
- rbd_dev->mapping.size = 0;
- rbd_dev->mapping.features = 0;
- rbd_dev->mapping.read_only = true;
+ u64 snap_id = rbd_dev->spec->snap_id;
+ u64 size = 0;
+ u64 features = 0;
+ int ret;
+
+ ret = rbd_snap_size(rbd_dev, snap_id, &size);
+ if (ret)
+ return ret;
+ ret = rbd_snap_features(rbd_dev, snap_id, &features);
+ if (ret)
+ return ret;
+
+ rbd_dev->mapping.size = size;
+ rbd_dev->mapping.features = features;
+
+ return 0;
}
-static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
+static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
{
rbd_dev->mapping.size = 0;
rbd_dev->mapping.features = 0;
- rbd_dev->mapping.read_only = true;
}
static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
u64 segment;
int ret;
- name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
+ name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
if (!name)
return NULL;
segment = offset >> rbd_dev->header.obj_order;
return name;
}
+static void rbd_segment_name_free(const char *name)
+{
+ /* The explicit cast here is needed to drop the const qualifier */
+
+ kmem_cache_free(rbd_segment_name_cache, (void *)name);
+}
+
static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
{
u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
kref_put(&obj_request->kref, rbd_obj_request_destroy);
}
-static void rbd_img_request_get(struct rbd_img_request *img_request)
-{
- dout("%s: img %p (was %d)\n", __func__, img_request,
- atomic_read(&img_request->kref.refcount));
- kref_get(&img_request->kref);
-}
-
+static bool img_request_child_test(struct rbd_img_request *img_request);
+static void rbd_parent_request_destroy(struct kref *kref);
static void rbd_img_request_destroy(struct kref *kref);
static void rbd_img_request_put(struct rbd_img_request *img_request)
{
rbd_assert(img_request != NULL);
dout("%s: img %p (was %d)\n", __func__, img_request,
atomic_read(&img_request->kref.refcount));
- kref_put(&img_request->kref, rbd_img_request_destroy);
+ if (img_request_child_test(img_request))
+ kref_put(&img_request->kref, rbd_parent_request_destroy);
+ else
+ kref_put(&img_request->kref, rbd_img_request_destroy);
}
static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
smp_mb();
}
+static void img_request_child_clear(struct rbd_img_request *img_request)
+{
+ clear_bit(IMG_REQ_CHILD, &img_request->flags);
+ smp_mb();
+}
+
static bool img_request_child_test(struct rbd_img_request *img_request)
{
smp_mb();
smp_mb();
}
+static void img_request_layered_clear(struct rbd_img_request *img_request)
+{
+ clear_bit(IMG_REQ_LAYERED, &img_request->flags);
+ smp_mb();
+}
+
static bool img_request_layered_test(struct rbd_img_request *img_request)
{
smp_mb();
if (osd_req->r_result < 0)
obj_request->result = osd_req->r_result;
- obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
BUG_ON(osd_req->r_num_ops > 2);
rbd_assert(obj_request_type_valid(type));
size = strlen(object_name) + 1;
- obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
- if (!obj_request)
+ name = kmalloc(size, GFP_KERNEL);
+ if (!name)
+ return NULL;
+
+ obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
+ if (!obj_request) {
+ kfree(name);
return NULL;
+ }
- name = (char *)(obj_request + 1);
obj_request->object_name = memcpy(name, object_name, size);
obj_request->offset = offset;
obj_request->length = length;
break;
}
- kfree(obj_request);
+ kfree(obj_request->object_name);
+ obj_request->object_name = NULL;
+ kmem_cache_free(rbd_obj_request_cache, obj_request);
+}
+
+/* It's OK to call this for a device with no parent */
+
+static void rbd_spec_put(struct rbd_spec *spec);
+static void rbd_dev_unparent(struct rbd_device *rbd_dev)
+{
+ rbd_dev_remove_parent(rbd_dev);
+ rbd_spec_put(rbd_dev->parent_spec);
+ rbd_dev->parent_spec = NULL;
+ rbd_dev->parent_overlap = 0;
+}
+
+/*
+ * Parent image reference counting is used to determine when an
+ * image's parent fields can be safely torn down--after there are no
+ * more in-flight requests to the parent image. When the last
+ * reference is dropped, cleaning them up is safe.
+ */
+static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
+{
+ int counter;
+
+ if (!rbd_dev->parent_spec)
+ return;
+
+ counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
+ if (counter > 0)
+ return;
+
+ /* Last reference; clean up parent data structures */
+
+ if (!counter)
+ rbd_dev_unparent(rbd_dev);
+ else
+ rbd_warn(rbd_dev, "parent reference underflow\n");
+}
+
+/*
+ * If an image has a non-zero parent overlap, get a reference to its
+ * parent.
+ *
+ * We must get the reference before checking for the overlap to
+ * coordinate properly with zeroing the parent overlap in
+ * rbd_dev_v2_parent_info() when an image gets flattened. We
+ * drop it again if there is no overlap.
+ *
+ * Returns true if the rbd device has a parent with a non-zero
+ * overlap and a reference for it was successfully taken, or
+ * false otherwise.
+ */
+static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
+{
+ int counter;
+
+ if (!rbd_dev->parent_spec)
+ return false;
+
+ counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
+ if (counter > 0 && rbd_dev->parent_overlap)
+ return true;
+
+ /* Image was flattened, but parent is not yet torn down */
+
+ if (counter < 0)
+ rbd_warn(rbd_dev, "parent reference overflow\n");
+
+ return false;
}
/*
static struct rbd_img_request *rbd_img_request_create(
struct rbd_device *rbd_dev,
u64 offset, u64 length,
- bool write_request,
- bool child_request)
+ bool write_request)
{
struct rbd_img_request *img_request;
- img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
+ img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
if (!img_request)
return NULL;
} else {
img_request->snap_id = rbd_dev->spec->snap_id;
}
- if (child_request)
- img_request_child_set(img_request);
- if (rbd_dev->parent_spec)
+ if (rbd_dev_parent_get(rbd_dev))
img_request_layered_set(img_request);
spin_lock_init(&img_request->completion_lock);
img_request->next_completion = 0;
INIT_LIST_HEAD(&img_request->obj_requests);
kref_init(&img_request->kref);
- rbd_img_request_get(img_request); /* Avoid a warning */
- rbd_img_request_put(img_request); /* TEMPORARY */
-
dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
write_request ? "write" : "read", offset, length,
img_request);
rbd_img_obj_request_del(img_request, obj_request);
rbd_assert(img_request->obj_request_count == 0);
+ if (img_request_layered_test(img_request)) {
+ img_request_layered_clear(img_request);
+ rbd_dev_parent_put(img_request->rbd_dev);
+ }
+
if (img_request_write_test(img_request))
ceph_put_snap_context(img_request->snapc);
- if (img_request_child_test(img_request))
- rbd_obj_request_put(img_request->obj_request);
+ kmem_cache_free(rbd_img_request_cache, img_request);
+}
+
+static struct rbd_img_request *rbd_parent_request_create(
+ struct rbd_obj_request *obj_request,
+ u64 img_offset, u64 length)
+{
+ struct rbd_img_request *parent_request;
+ struct rbd_device *rbd_dev;
+
+ rbd_assert(obj_request->img_request);
+ rbd_dev = obj_request->img_request->rbd_dev;
+
+ parent_request = rbd_img_request_create(rbd_dev->parent,
+ img_offset, length, false);
+ if (!parent_request)
+ return NULL;
+
+ img_request_child_set(parent_request);
+ rbd_obj_request_get(obj_request);
+ parent_request->obj_request = obj_request;
+
+ return parent_request;
+}
+
+static void rbd_parent_request_destroy(struct kref *kref)
+{
+ struct rbd_img_request *parent_request;
+ struct rbd_obj_request *orig_request;
+
+ parent_request = container_of(kref, struct rbd_img_request, kref);
+ orig_request = parent_request->obj_request;
+
+ parent_request->obj_request = NULL;
+ rbd_obj_request_put(orig_request);
+ img_request_child_clear(parent_request);
- kfree(img_request);
+ rbd_img_request_destroy(kref);
}
static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
length = rbd_segment_length(rbd_dev, img_offset, resid);
obj_request = rbd_obj_request_create(object_name,
offset, length, type);
- kfree(object_name); /* object request has its own copy */
+ /* object request has its own copy of the object name */
+ rbd_segment_name_free(object_name);
if (!obj_request)
goto out_unwind;
{
struct rbd_img_request *img_request;
struct rbd_device *rbd_dev;
- u64 length;
+ struct page **pages;
u32 page_count;
rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
rbd_dev = img_request->rbd_dev;
rbd_assert(rbd_dev);
- length = (u64)1 << rbd_dev->header.obj_order;
- page_count = (u32)calc_pages_for(0, length);
- rbd_assert(obj_request->copyup_pages);
- ceph_release_page_vector(obj_request->copyup_pages, page_count);
+ pages = obj_request->copyup_pages;
+ rbd_assert(pages != NULL);
obj_request->copyup_pages = NULL;
+ page_count = obj_request->copyup_page_count;
+ rbd_assert(page_count);
+ obj_request->copyup_page_count = 0;
+ ceph_release_page_vector(pages, page_count);
/*
* We want the transfer count to reflect the size of the
struct ceph_osd_client *osdc;
struct rbd_device *rbd_dev;
struct page **pages;
+ u32 page_count;
int result;
- u64 obj_size;
- u64 xferred;
+ u64 parent_length;
+ u64 offset;
+ u64 length;
rbd_assert(img_request_child_test(img_request));
pages = img_request->copyup_pages;
rbd_assert(pages != NULL);
img_request->copyup_pages = NULL;
+ page_count = img_request->copyup_page_count;
+ rbd_assert(page_count);
+ img_request->copyup_page_count = 0;
orig_request = img_request->obj_request;
rbd_assert(orig_request != NULL);
- rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
+ rbd_assert(obj_request_type_valid(orig_request->type));
result = img_request->result;
- obj_size = img_request->length;
- xferred = img_request->xferred;
+ parent_length = img_request->length;
+ rbd_assert(parent_length == img_request->xferred);
+ rbd_img_request_put(img_request);
- rbd_dev = img_request->rbd_dev;
+ rbd_assert(orig_request->img_request);
+ rbd_dev = orig_request->img_request->rbd_dev;
rbd_assert(rbd_dev);
- rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
-
- rbd_img_request_put(img_request);
if (result)
goto out_err;
- /* Allocate the new copyup osd request for the original request */
-
+ /*
+ * The original osd request is of no use to use any more.
+ * We need a new one that can hold the two ops in a copyup
+ * request. Allocate the new copyup osd request for the
+ * original request, and release the old one.
+ */
result = -ENOMEM;
- rbd_assert(!orig_request->osd_req);
osd_req = rbd_osd_req_create_copyup(orig_request);
if (!osd_req)
goto out_err;
+ rbd_osd_req_destroy(orig_request->osd_req);
orig_request->osd_req = osd_req;
orig_request->copyup_pages = pages;
+ orig_request->copyup_page_count = page_count;
/* Initialize the copyup op */
osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
- osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
+ osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
false, false);
/* Then the original write request op */
+ offset = orig_request->offset;
+ length = orig_request->length;
osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
- orig_request->offset,
- orig_request->length, 0, 0);
- osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
- orig_request->length);
+ offset, length, 0, 0);
+ if (orig_request->type == OBJ_REQUEST_BIO)
+ osd_req_op_extent_osd_data_bio(osd_req, 1,
+ orig_request->bio_list, length);
+ else
+ osd_req_op_extent_osd_data_pages(osd_req, 1,
+ orig_request->pages, length,
+ offset & ~PAGE_MASK, false, false);
rbd_osd_req_format_write(orig_request);
int result;
rbd_assert(obj_request_img_data_test(obj_request));
- rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+ rbd_assert(obj_request_type_valid(obj_request->type));
img_request = obj_request->img_request;
rbd_assert(img_request != NULL);
rbd_dev = img_request->rbd_dev;
rbd_assert(rbd_dev->parent != NULL);
- /*
- * First things first. The original osd request is of no
- * use to use any more, we'll need a new one that can hold
- * the two ops in a copyup request. We'll get that later,
- * but for now we can release the old one.
- */
- rbd_osd_req_destroy(obj_request->osd_req);
- obj_request->osd_req = NULL;
-
/*
* Determine the byte range covered by the object in the
* child image to which the original request was to be sent.
}
result = -ENOMEM;
- parent_request = rbd_img_request_create(rbd_dev->parent,
- img_offset, length,
- false, true);
+ parent_request = rbd_parent_request_create(obj_request,
+ img_offset, length);
if (!parent_request)
goto out_err;
- rbd_obj_request_get(obj_request);
- parent_request->obj_request = obj_request;
result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
if (result)
goto out_err;
parent_request->copyup_pages = pages;
+ parent_request->copyup_page_count = page_count;
parent_request->callback = rbd_img_obj_parent_read_full_callback;
result = rbd_img_request_submit(parent_request);
return 0;
parent_request->copyup_pages = NULL;
+ parent_request->copyup_page_count = 0;
parent_request->obj_request = NULL;
rbd_obj_request_put(obj_request);
out_err:
obj_request->xferred = img_request->xferred;
}
out:
+ rbd_img_request_put(img_request);
rbd_img_obj_request_read_callback(obj_request);
rbd_obj_request_complete(obj_request);
}
static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
{
- struct rbd_device *rbd_dev;
struct rbd_img_request *img_request;
int result;
rbd_assert(obj_request_img_data_test(obj_request));
rbd_assert(obj_request->img_request != NULL);
rbd_assert(obj_request->result == (s32) -ENOENT);
- rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+ rbd_assert(obj_request_type_valid(obj_request->type));
- rbd_dev = obj_request->img_request->rbd_dev;
- rbd_assert(rbd_dev->parent != NULL);
/* rbd_read_finish(obj_request, obj_request->length); */
- img_request = rbd_img_request_create(rbd_dev->parent,
+ img_request = rbd_parent_request_create(obj_request,
obj_request->img_offset,
- obj_request->length,
- false, true);
+ obj_request->length);
result = -ENOMEM;
if (!img_request)
goto out_err;
- rbd_obj_request_get(obj_request);
- img_request->obj_request = obj_request;
-
- result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
- obj_request->bio_list);
+ if (obj_request->type == OBJ_REQUEST_BIO)
+ result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
+ obj_request->bio_list);
+ else
+ result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
+ obj_request->pages);
if (result)
goto out_err;
obj_request_done_set(obj_request);
}
-static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
- u64 ver, u64 notify_id)
+static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
{
struct rbd_obj_request *obj_request;
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
obj_request->callback = rbd_obj_request_put;
osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
- notify_id, ver, 0);
+ notify_id, 0, 0);
rbd_osd_req_format_read(obj_request);
ret = rbd_obj_request_submit(osdc, obj_request);
static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
{
struct rbd_device *rbd_dev = (struct rbd_device *)data;
- u64 hver;
+ int ret;
if (!rbd_dev)
return;
dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
- rbd_dev->header_name, (unsigned long long) notify_id,
- (unsigned int) opcode);
- (void)rbd_dev_refresh(rbd_dev, &hver);
+ rbd_dev->header_name, (unsigned long long)notify_id,
+ (unsigned int)opcode);
+ ret = rbd_dev_refresh(rbd_dev);
+ if (ret)
+ rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
- rbd_obj_notify_ack(rbd_dev, hver, notify_id);
+ rbd_obj_notify_ack(rbd_dev, notify_id);
}
/*
* Request sync osd watch/unwatch. The value of "start" determines
* whether a watch request is being initiated or torn down.
*/
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request;
rbd_dev->watch_request->osd_req);
osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
- rbd_dev->watch_event->cookie, 0, start);
+ rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
rbd_osd_req_format_write(obj_request);
ret = rbd_obj_request_submit(osdc, obj_request);
const void *outbound,
size_t outbound_size,
void *inbound,
- size_t inbound_size,
- u64 *version)
+ size_t inbound_size)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request;
rbd_assert(obj_request->xferred < (u64)INT_MAX);
ret = (int)obj_request->xferred;
ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
- if (version)
- *version = obj_request->version;
out:
if (obj_request)
rbd_obj_request_put(obj_request);
goto end_request; /* Shouldn't happen */
}
+ result = -EIO;
+ if (offset + length > rbd_dev->mapping.size) {
+ rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
+ offset, length, rbd_dev->mapping.size);
+ goto end_request;
+ }
+
result = -ENOMEM;
img_request = rbd_img_request_create(rbd_dev, offset, length,
- write_request, false);
+ write_request);
if (!img_request)
goto end_request;
static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
const char *object_name,
- u64 offset, u64 length,
- void *buf, u64 *version)
+ u64 offset, u64 length, void *buf)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
size = (size_t) obj_request->xferred;
ceph_copy_from_page_vector(pages, buf, 0, size);
- rbd_assert(size <= (size_t) INT_MAX);
- ret = (int) size;
- if (version)
- *version = obj_request->version;
+ rbd_assert(size <= (size_t)INT_MAX);
+ ret = (int)size;
out:
if (obj_request)
rbd_obj_request_put(obj_request);
}
/*
- * Read the complete header for the given rbd device.
- *
- * Returns a pointer to a dynamically-allocated buffer containing
- * the complete and validated header. Caller can pass the address
- * of a variable that will be filled in with the version of the
- * header object at the time it was read.
- *
- * Returns a pointer-coded errno if a failure occurs.
+ * Read the complete header for the given rbd device. On successful
+ * return, the rbd_dev->header field will contain up-to-date
+ * information about the image.
*/
-static struct rbd_image_header_ondisk *
-rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
+static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
{
struct rbd_image_header_ondisk *ondisk = NULL;
u32 snap_count = 0;
size += names_size;
ondisk = kmalloc(size, GFP_KERNEL);
if (!ondisk)
- return ERR_PTR(-ENOMEM);
+ return -ENOMEM;
ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
- 0, size, ondisk, version);
+ 0, size, ondisk);
if (ret < 0)
- goto out_err;
+ goto out;
if ((size_t)ret < size) {
ret = -ENXIO;
rbd_warn(rbd_dev, "short header read (want %zd got %d)",
size, ret);
- goto out_err;
+ goto out;
}
if (!rbd_dev_ondisk_valid(ondisk)) {
ret = -ENXIO;
rbd_warn(rbd_dev, "invalid header");
- goto out_err;
+ goto out;
}
names_size = le64_to_cpu(ondisk->snap_names_len);
snap_count = le32_to_cpu(ondisk->snap_count);
} while (snap_count != want_count);
- return ondisk;
-
-out_err:
- kfree(ondisk);
-
- return ERR_PTR(ret);
-}
-
-/*
- * reload the ondisk the header
- */
-static int rbd_read_header(struct rbd_device *rbd_dev,
- struct rbd_image_header *header)
-{
- struct rbd_image_header_ondisk *ondisk;
- u64 ver = 0;
- int ret;
-
- ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
- if (IS_ERR(ondisk))
- return PTR_ERR(ondisk);
- ret = rbd_header_from_disk(header, ondisk);
+ ret = rbd_header_from_disk(rbd_dev, ondisk);
+out:
kfree(ondisk);
return ret;
}
-static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
-{
- struct rbd_snap *snap;
- struct rbd_snap *next;
-
- list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
- list_del(&snap->node);
- rbd_snap_destroy(snap);
- }
-}
-
-static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
-{
- if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
- return;
-
- if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
- sector_t size;
-
- rbd_dev->mapping.size = rbd_dev->header.image_size;
- size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
- dout("setting size to %llu sectors", (unsigned long long)size);
- set_capacity(rbd_dev->disk, size);
- }
-}
-
/*
- * only read the first part of the ondisk header, without the snaps info
+ * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
+ * has disappeared from the (just updated) snapshot context.
*/
-static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
+static void rbd_exists_validate(struct rbd_device *rbd_dev)
{
- int ret;
- struct rbd_image_header h;
-
- ret = rbd_read_header(rbd_dev, &h);
- if (ret < 0)
- return ret;
-
- down_write(&rbd_dev->header_rwsem);
-
- /* Update image size, and check for resize of mapped image */
- rbd_dev->header.image_size = h.image_size;
- rbd_update_mapping_size(rbd_dev);
-
- /* rbd_dev->header.object_prefix shouldn't change */
- kfree(rbd_dev->header.snap_sizes);
- kfree(rbd_dev->header.snap_names);
- /* osd requests may still refer to snapc */
- ceph_put_snap_context(rbd_dev->header.snapc);
-
- rbd_dev->header.image_size = h.image_size;
- rbd_dev->header.snapc = h.snapc;
- rbd_dev->header.snap_names = h.snap_names;
- rbd_dev->header.snap_sizes = h.snap_sizes;
- /* Free the extra copy of the object prefix */
- if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
- rbd_warn(rbd_dev, "object prefix changed (ignoring)");
- kfree(h.object_prefix);
+ u64 snap_id;
- ret = rbd_dev_snaps_update(rbd_dev);
+ if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
+ return;
- up_write(&rbd_dev->header_rwsem);
+ snap_id = rbd_dev->spec->snap_id;
+ if (snap_id == CEPH_NOSNAP)
+ return;
- return ret;
+ if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
+ clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
}
-static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
+static int rbd_dev_refresh(struct rbd_device *rbd_dev)
{
- u64 image_size;
+ u64 mapping_size;
int ret;
rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
- image_size = rbd_dev->header.image_size;
+ mapping_size = rbd_dev->mapping.size;
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
if (rbd_dev->image_format == 1)
- ret = rbd_dev_v1_refresh(rbd_dev, hver);
+ ret = rbd_dev_v1_header_info(rbd_dev);
else
- ret = rbd_dev_v2_refresh(rbd_dev, hver);
+ ret = rbd_dev_v2_header_info(rbd_dev);
+
+ /* If it's a mapped snapshot, validate its EXISTS flag */
+
+ rbd_exists_validate(rbd_dev);
mutex_unlock(&ctl_mutex);
- if (ret)
- rbd_warn(rbd_dev, "got notification but failed to "
- " update snaps: %d\n", ret);
- if (image_size != rbd_dev->header.image_size)
+ if (mapping_size != rbd_dev->mapping.size) {
+ sector_t size;
+
+ size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
+ dout("setting size to %llu sectors", (unsigned long long)size);
+ set_capacity(rbd_dev->disk, size);
revalidate_disk(rbd_dev->disk);
+ }
return ret;
}
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
int ret;
- ret = rbd_dev_refresh(rbd_dev, NULL);
+ ret = rbd_dev_refresh(rbd_dev);
+ if (ret)
+ rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
return ret < 0 ? ret : size;
}
spin_lock_init(&rbd_dev->lock);
rbd_dev->flags = 0;
+ atomic_set(&rbd_dev->parent_ref, 0);
INIT_LIST_HEAD(&rbd_dev->node);
- INIT_LIST_HEAD(&rbd_dev->snaps);
init_rwsem(&rbd_dev->header_rwsem);
rbd_dev->spec = spec;
kfree(rbd_dev);
}
-static void rbd_snap_destroy(struct rbd_snap *snap)
-{
- kfree(snap->name);
- kfree(snap);
-}
-
-static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
- const char *snap_name,
- u64 snap_id, u64 snap_size,
- u64 snap_features)
-{
- struct rbd_snap *snap;
-
- snap = kzalloc(sizeof (*snap), GFP_KERNEL);
- if (!snap)
- return ERR_PTR(-ENOMEM);
-
- snap->name = snap_name;
- snap->id = snap_id;
- snap->size = snap_size;
- snap->features = snap_features;
-
- return snap;
-}
-
-/*
- * Returns a dynamically-allocated snapshot name if successful, or a
- * pointer-coded error otherwise.
- */
-static const char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
- u64 *snap_size, u64 *snap_features)
-{
- const char *snap_name;
- int i;
-
- rbd_assert(which < rbd_dev->header.snapc->num_snaps);
-
- /* Skip over names until we find the one we are looking for */
-
- snap_name = rbd_dev->header.snap_names;
- for (i = 0; i < which; i++)
- snap_name += strlen(snap_name) + 1;
-
- snap_name = kstrdup(snap_name, GFP_KERNEL);
- if (!snap_name)
- return ERR_PTR(-ENOMEM);
-
- *snap_size = rbd_dev->header.snap_sizes[which];
- *snap_features = 0; /* No features for v1 */
-
- return snap_name;
-}
-
/*
* Get the size and object order for an image snapshot, or if
* snap_id is CEPH_NOSNAP, gets this information for the base
ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
"rbd", "get_size",
&snapid, sizeof (snapid),
- &size_buf, sizeof (size_buf), NULL);
+ &size_buf, sizeof (size_buf));
dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
if (ret < 0)
return ret;
ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
"rbd", "get_object_prefix", NULL, 0,
- reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
+ reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
if (ret < 0)
goto out;
ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
"rbd", "get_features",
&snapid, sizeof (snapid),
- &features_buf, sizeof (features_buf), NULL);
+ &features_buf, sizeof (features_buf));
dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
if (ret < 0)
return ret;
__le64 snapid;
void *p;
void *end;
+ u64 pool_id;
char *image_id;
u64 overlap;
int ret;
ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
"rbd", "get_parent",
&snapid, sizeof (snapid),
- reply_buf, size, NULL);
+ reply_buf, size);
dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
if (ret < 0)
goto out_err;
p = reply_buf;
end = reply_buf + ret;
ret = -ERANGE;
- ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
- if (parent_spec->pool_id == CEPH_NOPOOL)
+ ceph_decode_64_safe(&p, end, pool_id, out_err);
+ if (pool_id == CEPH_NOPOOL) {
+ /*
+ * Either the parent never existed, or we have
+ * record of it but the image got flattened so it no
+ * longer has a parent. When the parent of a
+ * layered image disappears we immediately set the
+ * overlap to 0. The effect of this is that all new
+ * requests will be treated as if the image had no
+ * parent.
+ */
+ if (rbd_dev->parent_overlap) {
+ rbd_dev->parent_overlap = 0;
+ smp_mb();
+ rbd_dev_parent_put(rbd_dev);
+ pr_info("%s: clone image has been flattened\n",
+ rbd_dev->disk->disk_name);
+ }
+
goto out; /* No parent? No problem. */
+ }
/* The ceph file layout needs to fit pool id in 32 bits */
ret = -EIO;
- if (parent_spec->pool_id > (u64)U32_MAX) {
+ if (pool_id > (u64)U32_MAX) {
rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
- (unsigned long long)parent_spec->pool_id, U32_MAX);
+ (unsigned long long)pool_id, U32_MAX);
goto out_err;
}
+ parent_spec->pool_id = pool_id;
image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
if (IS_ERR(image_id)) {
ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
ceph_decode_64_safe(&p, end, overlap, out_err);
- rbd_dev->parent_overlap = overlap;
- rbd_dev->parent_spec = parent_spec;
- parent_spec = NULL; /* rbd_dev now owns this */
+ if (overlap) {
+ rbd_spec_put(rbd_dev->parent_spec);
+ rbd_dev->parent_spec = parent_spec;
+ parent_spec = NULL; /* rbd_dev now owns this */
+ rbd_dev->parent_overlap = overlap;
+ } else {
+ rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
+ }
out:
ret = 0;
out_err:
ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
"rbd", "get_stripe_unit_count", NULL, 0,
- (char *)&striping_info_buf, size, NULL);
+ (char *)&striping_info_buf, size);
dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
if (ret < 0)
return ret;
ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
"rbd", "dir_get_name",
image_id, image_id_size,
- reply_buf, size, NULL);
+ reply_buf, size);
if (ret < 0)
goto out;
p = reply_buf;
return image_name;
}
+static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
+{
+ struct ceph_snap_context *snapc = rbd_dev->header.snapc;
+ const char *snap_name;
+ u32 which = 0;
+
+ /* Skip over names until we find the one we are looking for */
+
+ snap_name = rbd_dev->header.snap_names;
+ while (which < snapc->num_snaps) {
+ if (!strcmp(name, snap_name))
+ return snapc->snaps[which];
+ snap_name += strlen(snap_name) + 1;
+ which++;
+ }
+ return CEPH_NOSNAP;
+}
+
+static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
+{
+ struct ceph_snap_context *snapc = rbd_dev->header.snapc;
+ u32 which;
+ bool found = false;
+ u64 snap_id;
+
+ for (which = 0; !found && which < snapc->num_snaps; which++) {
+ const char *snap_name;
+
+ snap_id = snapc->snaps[which];
+ snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
+ if (IS_ERR(snap_name))
+ break;
+ found = !strcmp(name, snap_name);
+ kfree(snap_name);
+ }
+ return found ? snap_id : CEPH_NOSNAP;
+}
+
+/*
+ * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
+ * no snapshot by that name is found, or if an error occurs.
+ */
+static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
+{
+ if (rbd_dev->image_format == 1)
+ return rbd_v1_snap_id_by_name(rbd_dev, name);
+
+ return rbd_v2_snap_id_by_name(rbd_dev, name);
+}
+
/*
* When an rbd image has a parent image, it is identified by the
* pool, image, and snapshot ids (not names). This function fills
* When an image being mapped (not a parent) is probed, we have the
* pool name and pool id, image name and image id, and the snapshot
* name. The only thing we're missing is the snapshot id.
- *
- * The set of snapshots for an image is not known until they have
- * been read by rbd_dev_snaps_update(), so we can't completely fill
- * in this information until after that has been called.
*/
static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
{
*/
if (spec->pool_name) {
if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
- struct rbd_snap *snap;
+ u64 snap_id;
- snap = snap_by_name(rbd_dev, spec->snap_name);
- if (!snap)
+ snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
+ if (snap_id == CEPH_NOSNAP)
return -ENOENT;
- spec->snap_id = snap->id;
+ spec->snap_id = snap_id;
} else {
spec->snap_id = CEPH_NOSNAP;
}
/* Look up the snapshot name, and make a copy */
snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
- if (!snap_name) {
- rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
- ret = -EIO;
- goto out_err;
- }
- snap_name = kstrdup(snap_name, GFP_KERNEL);
if (!snap_name) {
ret = -ENOMEM;
goto out_err;
return ret;
}
-static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
+static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
{
size_t size;
int ret;
ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
"rbd", "get_snapcontext", NULL, 0,
- reply_buf, size, ver);
+ reply_buf, size);
dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
if (ret < 0)
goto out;
for (i = 0; i < snap_count; i++)
snapc->snaps[i] = ceph_decode_64(&p);
+ ceph_put_snap_context(rbd_dev->header.snapc);
rbd_dev->header.snapc = snapc;
dout(" snap context seq = %llu, snap_count = %u\n",
return ret;
}
-static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
+static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
+ u64 snap_id)
{
size_t size;
void *reply_buf;
- __le64 snap_id;
+ __le64 snapid;
int ret;
void *p;
void *end;
if (!reply_buf)
return ERR_PTR(-ENOMEM);
- rbd_assert(which < rbd_dev->header.snapc->num_snaps);
- snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
+ snapid = cpu_to_le64(snap_id);
ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
"rbd", "get_snapshot_name",
- &snap_id, sizeof (snap_id),
- reply_buf, size, NULL);
+ &snapid, sizeof (snapid),
+ reply_buf, size);
dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
if (ret < 0) {
snap_name = ERR_PTR(ret);
goto out;
dout(" snap_id 0x%016llx snap_name = %s\n",
- (unsigned long long)le64_to_cpu(snap_id), snap_name);
+ (unsigned long long)snap_id, snap_name);
out:
kfree(reply_buf);
return snap_name;
}
-static const char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
- u64 *snap_size, u64 *snap_features)
+static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
{
- u64 snap_id;
- u64 size;
- u64 features;
- const char *snap_name;
+ bool first_time = rbd_dev->header.object_prefix == NULL;
int ret;
- rbd_assert(which < rbd_dev->header.snapc->num_snaps);
- snap_id = rbd_dev->header.snapc->snaps[which];
- ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
- if (ret)
- goto out_err;
-
- ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
- if (ret)
- goto out_err;
+ down_write(&rbd_dev->header_rwsem);
- snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
- if (!IS_ERR(snap_name)) {
- *snap_size = size;
- *snap_features = features;
+ if (first_time) {
+ ret = rbd_dev_v2_header_onetime(rbd_dev);
+ if (ret)
+ goto out;
}
- return snap_name;
-out_err:
- return ERR_PTR(ret);
-}
-
-static const char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
- u64 *snap_size, u64 *snap_features)
-{
- if (rbd_dev->image_format == 1)
- return rbd_dev_v1_snap_info(rbd_dev, which,
- snap_size, snap_features);
- if (rbd_dev->image_format == 2)
- return rbd_dev_v2_snap_info(rbd_dev, which,
- snap_size, snap_features);
- return ERR_PTR(-EINVAL);
-}
+ /*
+ * If the image supports layering, get the parent info. We
+ * need to probe the first time regardless. Thereafter we
+ * only need to if there's a parent, to see if it has
+ * disappeared due to the mapped image getting flattened.
+ */
+ if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
+ (first_time || rbd_dev->parent_spec)) {
+ bool warn;
-static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
-{
- int ret;
+ ret = rbd_dev_v2_parent_info(rbd_dev);
+ if (ret)
+ goto out;
- down_write(&rbd_dev->header_rwsem);
+ /*
+ * Print a warning if this is the initial probe and
+ * the image has a parent. Don't print it if the
+ * image now being probed is itself a parent. We
+ * can tell at this point because we won't know its
+ * pool name yet (just its pool id).
+ */
+ warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
+ if (first_time && warn)
+ rbd_warn(rbd_dev, "WARNING: kernel layering "
+ "is EXPERIMENTAL!");
+ }
ret = rbd_dev_v2_image_size(rbd_dev);
if (ret)
goto out;
- rbd_update_mapping_size(rbd_dev);
- ret = rbd_dev_v2_snap_context(rbd_dev, hver);
+ if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
+ if (rbd_dev->mapping.size != rbd_dev->header.image_size)
+ rbd_dev->mapping.size = rbd_dev->header.image_size;
+
+ ret = rbd_dev_v2_snap_context(rbd_dev);
dout("rbd_dev_v2_snap_context returned %d\n", ret);
- if (ret)
- goto out;
- ret = rbd_dev_snaps_update(rbd_dev);
- dout("rbd_dev_snaps_update returned %d\n", ret);
- if (ret)
- goto out;
out:
up_write(&rbd_dev->header_rwsem);
return ret;
}
-/*
- * Scan the rbd device's current snapshot list and compare it to the
- * newly-received snapshot context. Remove any existing snapshots
- * not present in the new snapshot context. Add a new snapshot for
- * any snaphots in the snapshot context not in the current list.
- * And verify there are no changes to snapshots we already know
- * about.
- *
- * Assumes the snapshots in the snapshot context are sorted by
- * snapshot id, highest id first. (Snapshots in the rbd_dev's list
- * are also maintained in that order.)
- *
- * Note that any error occurs while updating the snapshot list
- * aborts the update, and the entire list is cleared. The snapshot
- * list becomes inconsistent at that point anyway, so it might as
- * well be empty.
- */
-static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
-{
- struct ceph_snap_context *snapc = rbd_dev->header.snapc;
- const u32 snap_count = snapc->num_snaps;
- struct list_head *head = &rbd_dev->snaps;
- struct list_head *links = head->next;
- u32 index = 0;
- int ret = 0;
-
- dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
- while (index < snap_count || links != head) {
- u64 snap_id;
- struct rbd_snap *snap;
- const char *snap_name;
- u64 snap_size = 0;
- u64 snap_features = 0;
-
- snap_id = index < snap_count ? snapc->snaps[index]
- : CEPH_NOSNAP;
- snap = links != head ? list_entry(links, struct rbd_snap, node)
- : NULL;
- rbd_assert(!snap || snap->id != CEPH_NOSNAP);
-
- if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
- struct list_head *next = links->next;
-
- /*
- * A previously-existing snapshot is not in
- * the new snap context.
- *
- * If the now-missing snapshot is the one
- * the image represents, clear its existence
- * flag so we can avoid sending any more
- * requests to it.
- */
- if (rbd_dev->spec->snap_id == snap->id)
- clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
- dout("removing %ssnap id %llu\n",
- rbd_dev->spec->snap_id == snap->id ?
- "mapped " : "",
- (unsigned long long)snap->id);
-
- list_del(&snap->node);
- rbd_snap_destroy(snap);
-
- /* Done with this list entry; advance */
-
- links = next;
- continue;
- }
-
- snap_name = rbd_dev_snap_info(rbd_dev, index,
- &snap_size, &snap_features);
- if (IS_ERR(snap_name)) {
- ret = PTR_ERR(snap_name);
- dout("failed to get snap info, error %d\n", ret);
- goto out_err;
- }
-
- dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
- (unsigned long long)snap_id);
- if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
- struct rbd_snap *new_snap;
-
- /* We haven't seen this snapshot before */
-
- new_snap = rbd_snap_create(rbd_dev, snap_name,
- snap_id, snap_size, snap_features);
- if (IS_ERR(new_snap)) {
- ret = PTR_ERR(new_snap);
- dout(" failed to add dev, error %d\n", ret);
- goto out_err;
- }
-
- /* New goes before existing, or at end of list */
-
- dout(" added dev%s\n", snap ? "" : " at end\n");
- if (snap)
- list_add_tail(&new_snap->node, &snap->node);
- else
- list_add_tail(&new_snap->node, head);
- } else {
- /* Already have this one */
-
- dout(" already present\n");
-
- rbd_assert(snap->size == snap_size);
- rbd_assert(!strcmp(snap->name, snap_name));
- rbd_assert(snap->features == snap_features);
-
- /* Done with this list entry; advance */
-
- links = links->next;
- }
-
- /* Advance to the next entry in the snapshot context */
-
- index++;
- }
- dout("%s: done\n", __func__);
-
- return 0;
-out_err:
- rbd_remove_all_snaps(rbd_dev);
-
- return ret;
-}
-
static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
{
struct device *dev;
ret = rbd_obj_method_sync(rbd_dev, object_name,
"rbd", "get_id", NULL, 0,
- response, RBD_IMAGE_ID_LEN_MAX, NULL);
+ response, RBD_IMAGE_ID_LEN_MAX);
dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
if (ret == -ENOENT) {
image_id = kstrdup("", GFP_KERNEL);
{
struct rbd_image_header *header;
- rbd_dev_remove_parent(rbd_dev);
- rbd_spec_put(rbd_dev->parent_spec);
- rbd_dev->parent_spec = NULL;
- rbd_dev->parent_overlap = 0;
+ /* Drop parent reference unless it's already been done (or none) */
+
+ if (rbd_dev->parent_overlap)
+ rbd_dev_parent_put(rbd_dev);
/* Free dynamic fields from the header, then zero it out */
memset(header, 0, sizeof (*header));
}
-static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
-{
- int ret;
-
- /* Populate rbd image metadata */
-
- ret = rbd_read_header(rbd_dev, &rbd_dev->header);
- if (ret < 0)
- goto out_err;
-
- /* Version 1 images have no parent (no layering) */
-
- rbd_dev->parent_spec = NULL;
- rbd_dev->parent_overlap = 0;
-
- dout("discovered version 1 image, header name is %s\n",
- rbd_dev->header_name);
-
- return 0;
-
-out_err:
- kfree(rbd_dev->header_name);
- rbd_dev->header_name = NULL;
- kfree(rbd_dev->spec->image_id);
- rbd_dev->spec->image_id = NULL;
-
- return ret;
-}
-
-static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
+static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
{
int ret;
- u64 ver = 0;
-
- ret = rbd_dev_v2_image_size(rbd_dev);
- if (ret)
- goto out_err;
-
- /* Get the object prefix (a.k.a. block_name) for the image */
ret = rbd_dev_v2_object_prefix(rbd_dev);
if (ret)
goto out_err;
- /* Get the and check features for the image */
-
+ /*
+ * Get the and check features for the image. Currently the
+ * features are assumed to never change.
+ */
ret = rbd_dev_v2_features(rbd_dev);
if (ret)
goto out_err;
- /* If the image supports layering, get the parent info */
-
- if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
- ret = rbd_dev_v2_parent_info(rbd_dev);
- if (ret)
- goto out_err;
-
- /*
- * Don't print a warning for parent images. We can
- * tell this point because we won't know its pool
- * name yet (just its pool id).
- */
- if (rbd_dev->spec->pool_name)
- rbd_warn(rbd_dev, "WARNING: kernel layering "
- "is EXPERIMENTAL!");
- }
-
/* If the image supports fancy striping, get its parameters */
if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
if (ret < 0)
goto out_err;
}
-
- /* crypto and compression type aren't (yet) supported for v2 images */
-
- rbd_dev->header.crypt_type = 0;
- rbd_dev->header.comp_type = 0;
-
- /* Get the snapshot context, plus the header version */
-
- ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
- if (ret)
- goto out_err;
-
- dout("discovered version 2 image, header name is %s\n",
- rbd_dev->header_name);
+ /* No support for crypto and compression type format 2 images */
return 0;
out_err:
- rbd_dev->parent_overlap = 0;
- rbd_spec_put(rbd_dev->parent_spec);
- rbd_dev->parent_spec = NULL;
- kfree(rbd_dev->header_name);
- rbd_dev->header_name = NULL;
+ rbd_dev->header.features = 0;
kfree(rbd_dev->header.object_prefix);
rbd_dev->header.object_prefix = NULL;
if (!parent)
goto out_err;
- ret = rbd_dev_image_probe(parent);
+ ret = rbd_dev_image_probe(parent, false);
if (ret < 0)
goto out_err;
rbd_dev->parent = parent;
+ atomic_set(&rbd_dev->parent_ref, 1);
return 0;
out_err:
if (parent) {
- rbd_spec_put(rbd_dev->parent_spec);
+ rbd_dev_unparent(rbd_dev);
kfree(rbd_dev->header_name);
rbd_dev_destroy(parent);
} else {
{
int ret;
- ret = rbd_dev_mapping_set(rbd_dev);
- if (ret)
- return ret;
-
/* generate unique id: find highest unique id, add one */
rbd_dev_id_get(rbd_dev);
if (ret)
goto err_out_blkdev;
- ret = rbd_bus_add_dev(rbd_dev);
+ ret = rbd_dev_mapping_set(rbd_dev);
if (ret)
goto err_out_disk;
+ set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
+
+ ret = rbd_bus_add_dev(rbd_dev);
+ if (ret)
+ goto err_out_mapping;
/* Everything's ready. Announce the disk to the world. */
- set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
add_disk(rbd_dev->disk);
return ret;
+err_out_mapping:
+ rbd_dev_mapping_clear(rbd_dev);
err_out_disk:
rbd_free_disk(rbd_dev);
err_out_blkdev:
static void rbd_dev_image_release(struct rbd_device *rbd_dev)
{
- int ret;
-
- rbd_remove_all_snaps(rbd_dev);
rbd_dev_unprobe(rbd_dev);
- ret = rbd_dev_header_watch_sync(rbd_dev, 0);
- if (ret)
- rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
kfree(rbd_dev->header_name);
rbd_dev->header_name = NULL;
rbd_dev->image_format = 0;
/*
* Probe for the existence of the header object for the given rbd
- * device. For format 2 images this includes determining the image
- * id.
+ * device. If this image is the one being mapped (i.e., not a
+ * parent), initiate a watch on its header object before using that
+ * object to get detailed information about the rbd image.
*/
-static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
{
int ret;
int tmp;
if (ret)
goto err_out_format;
- ret = rbd_dev_header_watch_sync(rbd_dev, 1);
- if (ret)
- goto out_header_name;
+ if (mapping) {
+ ret = rbd_dev_header_watch_sync(rbd_dev, true);
+ if (ret)
+ goto out_header_name;
+ }
if (rbd_dev->image_format == 1)
- ret = rbd_dev_v1_probe(rbd_dev);
+ ret = rbd_dev_v1_header_info(rbd_dev);
else
- ret = rbd_dev_v2_probe(rbd_dev);
+ ret = rbd_dev_v2_header_info(rbd_dev);
if (ret)
goto err_out_watch;
- ret = rbd_dev_snaps_update(rbd_dev);
+ ret = rbd_dev_spec_update(rbd_dev);
if (ret)
goto err_out_probe;
- ret = rbd_dev_spec_update(rbd_dev);
+ ret = rbd_dev_probe_parent(rbd_dev);
if (ret)
- goto err_out_snaps;
+ goto err_out_probe;
- ret = rbd_dev_probe_parent(rbd_dev);
- if (!ret)
- return 0;
+ dout("discovered format %u image, header name is %s\n",
+ rbd_dev->image_format, rbd_dev->header_name);
-err_out_snaps:
- rbd_remove_all_snaps(rbd_dev);
+ return 0;
err_out_probe:
rbd_dev_unprobe(rbd_dev);
err_out_watch:
- tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
- if (tmp)
- rbd_warn(rbd_dev, "unable to tear down watch request\n");
+ if (mapping) {
+ tmp = rbd_dev_header_watch_sync(rbd_dev, false);
+ if (tmp)
+ rbd_warn(rbd_dev, "unable to tear down "
+ "watch request (%d)\n", tmp);
+ }
out_header_name:
kfree(rbd_dev->header_name);
rbd_dev->header_name = NULL;
struct rbd_spec *spec = NULL;
struct rbd_client *rbdc;
struct ceph_osd_client *osdc;
+ bool read_only;
int rc = -ENOMEM;
if (!try_module_get(THIS_MODULE))
rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
if (rc < 0)
goto err_out_module;
+ read_only = rbd_opts->read_only;
+ kfree(rbd_opts);
+ rbd_opts = NULL; /* done with this */
rbdc = rbd_get_client(ceph_opts);
if (IS_ERR(rbdc)) {
rbdc = NULL; /* rbd_dev now owns this */
spec = NULL; /* rbd_dev now owns this */
- rbd_dev->mapping.read_only = rbd_opts->read_only;
- kfree(rbd_opts);
- rbd_opts = NULL; /* done with this */
-
- rc = rbd_dev_image_probe(rbd_dev);
+ rc = rbd_dev_image_probe(rbd_dev, true);
if (rc < 0)
goto err_out_rbd_dev;
+ /* If we are mapping a snapshot it must be marked read-only */
+
+ if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
+ read_only = true;
+ rbd_dev->mapping.read_only = read_only;
+
rc = rbd_dev_device_setup(rbd_dev);
if (!rc)
return count;
rbd_free_disk(rbd_dev);
clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
- rbd_dev_clear_mapping(rbd_dev);
+ rbd_dev_mapping_clear(rbd_dev);
unregister_blkdev(rbd_dev->major, rbd_dev->name);
rbd_dev->major = 0;
rbd_dev_id_put(rbd_dev);
spin_unlock_irq(&rbd_dev->lock);
if (ret < 0)
goto done;
- ret = count;
rbd_bus_del_dev(rbd_dev);
+ ret = rbd_dev_header_watch_sync(rbd_dev, false);
+ if (ret)
+ rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
rbd_dev_image_release(rbd_dev);
module_put(THIS_MODULE);
+ ret = count;
done:
mutex_unlock(&ctl_mutex);
device_unregister(&rbd_root_dev);
}
+static int rbd_slab_init(void)
+{
+ rbd_assert(!rbd_img_request_cache);
+ rbd_img_request_cache = kmem_cache_create("rbd_img_request",
+ sizeof (struct rbd_img_request),
+ __alignof__(struct rbd_img_request),
+ 0, NULL);
+ if (!rbd_img_request_cache)
+ return -ENOMEM;
+
+ rbd_assert(!rbd_obj_request_cache);
+ rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
+ sizeof (struct rbd_obj_request),
+ __alignof__(struct rbd_obj_request),
+ 0, NULL);
+ if (!rbd_obj_request_cache)
+ goto out_err;
+
+ rbd_assert(!rbd_segment_name_cache);
+ rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
+ MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
+ if (rbd_segment_name_cache)
+ return 0;
+out_err:
+ if (rbd_obj_request_cache) {
+ kmem_cache_destroy(rbd_obj_request_cache);
+ rbd_obj_request_cache = NULL;
+ }
+
+ kmem_cache_destroy(rbd_img_request_cache);
+ rbd_img_request_cache = NULL;
+
+ return -ENOMEM;
+}
+
+static void rbd_slab_exit(void)
+{
+ rbd_assert(rbd_segment_name_cache);
+ kmem_cache_destroy(rbd_segment_name_cache);
+ rbd_segment_name_cache = NULL;
+
+ rbd_assert(rbd_obj_request_cache);
+ kmem_cache_destroy(rbd_obj_request_cache);
+ rbd_obj_request_cache = NULL;
+
+ rbd_assert(rbd_img_request_cache);
+ kmem_cache_destroy(rbd_img_request_cache);
+ rbd_img_request_cache = NULL;
+}
+
static int __init rbd_init(void)
{
int rc;
return -EINVAL;
}
- rc = rbd_sysfs_init();
+ rc = rbd_slab_init();
if (rc)
return rc;
- pr_info("loaded " RBD_DRV_NAME_LONG "\n");
- return 0;
+ rc = rbd_sysfs_init();
+ if (rc)
+ rbd_slab_exit();
+ else
+ pr_info("loaded " RBD_DRV_NAME_LONG "\n");
+
+ return rc;
}
static void __exit rbd_exit(void)
{
rbd_sysfs_cleanup();
+ rbd_slab_exit();
}
module_init(rbd_init);