libceph: add support for primary_temp mappings
[firefly-linux-kernel-4.4.55.git] / net / ceph / osdmap.c
index aade4a5c1c07f6ab0ca0f1ee66dfacbded110aed..20a38a37794cedd554c23f70ca97e02748e3d4a1 100644 (file)
@@ -343,7 +343,7 @@ bad:
 
 /*
  * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
- * to a set of osds)
+ * to a set of osds) and primary_temp (explicit primary setting)
  */
 static int pgid_cmp(struct ceph_pg l, struct ceph_pg r)
 {
@@ -506,7 +506,7 @@ static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
        kfree(pi);
 }
 
-static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
+static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
 {
        u8 ev, cv;
        unsigned len, num;
@@ -587,7 +587,7 @@ bad:
        return -EINVAL;
 }
 
-static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
+static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
 {
        struct ceph_pg_pool_info *pi;
        u32 num, len;
@@ -633,6 +633,13 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
                rb_erase(&pg->node, &map->pg_temp);
                kfree(pg);
        }
+       while (!RB_EMPTY_ROOT(&map->primary_temp)) {
+               struct ceph_pg_mapping *pg =
+                       rb_entry(rb_first(&map->primary_temp),
+                                struct ceph_pg_mapping, node);
+               rb_erase(&pg->node, &map->primary_temp);
+               kfree(pg);
+       }
        while (!RB_EMPTY_ROOT(&map->pg_pools)) {
                struct ceph_pg_pool_info *pi =
                        rb_entry(rb_first(&map->pg_pools),
@@ -642,186 +649,514 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
        kfree(map->osd_state);
        kfree(map->osd_weight);
        kfree(map->osd_addr);
+       kfree(map->osd_primary_affinity);
        kfree(map);
 }
 
 /*
- * adjust max osd value.  reallocate arrays.
+ * Adjust max_osd value, (re)allocate arrays.
+ *
+ * The new elements are properly initialized.
  */
 static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
 {
        u8 *state;
-       struct ceph_entity_addr *addr;
        u32 *weight;
+       struct ceph_entity_addr *addr;
+       int i;
 
-       state = kcalloc(max, sizeof(*state), GFP_NOFS);
-       addr = kcalloc(max, sizeof(*addr), GFP_NOFS);
-       weight = kcalloc(max, sizeof(*weight), GFP_NOFS);
-       if (state == NULL || addr == NULL || weight == NULL) {
+       state = krealloc(map->osd_state, max*sizeof(*state), GFP_NOFS);
+       weight = krealloc(map->osd_weight, max*sizeof(*weight), GFP_NOFS);
+       addr = krealloc(map->osd_addr, max*sizeof(*addr), GFP_NOFS);
+       if (!state || !weight || !addr) {
                kfree(state);
-               kfree(addr);
                kfree(weight);
+               kfree(addr);
+
                return -ENOMEM;
        }
 
-       /* copy old? */
-       if (map->osd_state) {
-               memcpy(state, map->osd_state, map->max_osd*sizeof(*state));
-               memcpy(addr, map->osd_addr, map->max_osd*sizeof(*addr));
-               memcpy(weight, map->osd_weight, map->max_osd*sizeof(*weight));
-               kfree(map->osd_state);
-               kfree(map->osd_addr);
-               kfree(map->osd_weight);
+       for (i = map->max_osd; i < max; i++) {
+               state[i] = 0;
+               weight[i] = CEPH_OSD_OUT;
+               memset(addr + i, 0, sizeof(*addr));
        }
 
        map->osd_state = state;
        map->osd_weight = weight;
        map->osd_addr = addr;
+
+       if (map->osd_primary_affinity) {
+               u32 *affinity;
+
+               affinity = krealloc(map->osd_primary_affinity,
+                                   max*sizeof(*affinity), GFP_NOFS);
+               if (!affinity)
+                       return -ENOMEM;
+
+               for (i = map->max_osd; i < max; i++)
+                       affinity[i] = CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
+
+               map->osd_primary_affinity = affinity;
+       }
+
        map->max_osd = max;
+
        return 0;
 }
 
+#define OSDMAP_WRAPPER_COMPAT_VER      7
+#define OSDMAP_CLIENT_DATA_COMPAT_VER  1
+
 /*
- * decode a full map.
+ * Return 0 or error.  On success, *v is set to 0 for old (v6) osdmaps,
+ * to struct_v of the client_data section for new (v7 and above)
+ * osdmaps.
  */
-struct ceph_osdmap *osdmap_decode(void **p, void *end)
+static int get_osdmap_client_data_v(void **p, void *end,
+                                   const char *prefix, u8 *v)
 {
-       struct ceph_osdmap *map;
-       u16 version;
-       u32 len, max, i;
-       int err = -EINVAL;
-       void *start = *p;
-       struct ceph_pg_pool_info *pi;
+       u8 struct_v;
+
+       ceph_decode_8_safe(p, end, struct_v, e_inval);
+       if (struct_v >= 7) {
+               u8 struct_compat;
+
+               ceph_decode_8_safe(p, end, struct_compat, e_inval);
+               if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) {
+                       pr_warning("got v %d cv %d > %d of %s ceph_osdmap\n",
+                                  struct_v, struct_compat,
+                                  OSDMAP_WRAPPER_COMPAT_VER, prefix);
+                       return -EINVAL;
+               }
+               *p += 4; /* ignore wrapper struct_len */
+
+               ceph_decode_8_safe(p, end, struct_v, e_inval);
+               ceph_decode_8_safe(p, end, struct_compat, e_inval);
+               if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) {
+                       pr_warning("got v %d cv %d > %d of %s ceph_osdmap client data\n",
+                                  struct_v, struct_compat,
+                                  OSDMAP_CLIENT_DATA_COMPAT_VER, prefix);
+                       return -EINVAL;
+               }
+               *p += 4; /* ignore client data struct_len */
+       } else {
+               u16 version;
+
+               *p -= 1;
+               ceph_decode_16_safe(p, end, version, e_inval);
+               if (version < 6) {
+                       pr_warning("got v %d < 6 of %s ceph_osdmap\n", version,
+                                  prefix);
+                       return -EINVAL;
+               }
 
-       dout("osdmap_decode %p to %p len %d\n", *p, end, (int)(end - *p));
+               /* old osdmap enconding */
+               struct_v = 0;
+       }
 
-       map = kzalloc(sizeof(*map), GFP_NOFS);
-       if (map == NULL)
-               return ERR_PTR(-ENOMEM);
-       map->pg_temp = RB_ROOT;
+       *v = struct_v;
+       return 0;
 
-       ceph_decode_16_safe(p, end, version, bad);
-       if (version > 6) {
-               pr_warning("got unknown v %d > 6 of osdmap\n", version);
-               goto bad;
+e_inval:
+       return -EINVAL;
+}
+
+static int __decode_pools(void **p, void *end, struct ceph_osdmap *map,
+                         bool incremental)
+{
+       u32 n;
+
+       ceph_decode_32_safe(p, end, n, e_inval);
+       while (n--) {
+               struct ceph_pg_pool_info *pi;
+               u64 pool;
+               int ret;
+
+               ceph_decode_64_safe(p, end, pool, e_inval);
+
+               pi = __lookup_pg_pool(&map->pg_pools, pool);
+               if (!incremental || !pi) {
+                       pi = kzalloc(sizeof(*pi), GFP_NOFS);
+                       if (!pi)
+                               return -ENOMEM;
+
+                       pi->id = pool;
+
+                       ret = __insert_pg_pool(&map->pg_pools, pi);
+                       if (ret) {
+                               kfree(pi);
+                               return ret;
+                       }
+               }
+
+               ret = decode_pool(p, end, pi);
+               if (ret)
+                       return ret;
        }
-       if (version < 6) {
-               pr_warning("got old v %d < 6 of osdmap\n", version);
-               goto bad;
+
+       return 0;
+
+e_inval:
+       return -EINVAL;
+}
+
+static int decode_pools(void **p, void *end, struct ceph_osdmap *map)
+{
+       return __decode_pools(p, end, map, false);
+}
+
+static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map)
+{
+       return __decode_pools(p, end, map, true);
+}
+
+static int __decode_pg_temp(void **p, void *end, struct ceph_osdmap *map,
+                           bool incremental)
+{
+       u32 n;
+
+       ceph_decode_32_safe(p, end, n, e_inval);
+       while (n--) {
+               struct ceph_pg pgid;
+               u32 len, i;
+               int ret;
+
+               ret = ceph_decode_pgid(p, end, &pgid);
+               if (ret)
+                       return ret;
+
+               ceph_decode_32_safe(p, end, len, e_inval);
+
+               ret = __remove_pg_mapping(&map->pg_temp, pgid);
+               BUG_ON(!incremental && ret != -ENOENT);
+
+               if (!incremental || len > 0) {
+                       struct ceph_pg_mapping *pg;
+
+                       ceph_decode_need(p, end, len*sizeof(u32), e_inval);
+
+                       if (len > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
+                               return -EINVAL;
+
+                       pg = kzalloc(sizeof(*pg) + len*sizeof(u32), GFP_NOFS);
+                       if (!pg)
+                               return -ENOMEM;
+
+                       pg->pgid = pgid;
+                       pg->pg_temp.len = len;
+                       for (i = 0; i < len; i++)
+                               pg->pg_temp.osds[i] = ceph_decode_32(p);
+
+                       ret = __insert_pg_mapping(pg, &map->pg_temp);
+                       if (ret) {
+                               kfree(pg);
+                               return ret;
+                       }
+               }
+       }
+
+       return 0;
+
+e_inval:
+       return -EINVAL;
+}
+
+static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map)
+{
+       return __decode_pg_temp(p, end, map, false);
+}
+
+static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map)
+{
+       return __decode_pg_temp(p, end, map, true);
+}
+
+static int __decode_primary_temp(void **p, void *end, struct ceph_osdmap *map,
+                                bool incremental)
+{
+       u32 n;
+
+       ceph_decode_32_safe(p, end, n, e_inval);
+       while (n--) {
+               struct ceph_pg pgid;
+               u32 osd;
+               int ret;
+
+               ret = ceph_decode_pgid(p, end, &pgid);
+               if (ret)
+                       return ret;
+
+               ceph_decode_32_safe(p, end, osd, e_inval);
+
+               ret = __remove_pg_mapping(&map->primary_temp, pgid);
+               BUG_ON(!incremental && ret != -ENOENT);
+
+               if (!incremental || osd != (u32)-1) {
+                       struct ceph_pg_mapping *pg;
+
+                       pg = kzalloc(sizeof(*pg), GFP_NOFS);
+                       if (!pg)
+                               return -ENOMEM;
+
+                       pg->pgid = pgid;
+                       pg->primary_temp.osd = osd;
+
+                       ret = __insert_pg_mapping(pg, &map->primary_temp);
+                       if (ret) {
+                               kfree(pg);
+                               return ret;
+                       }
+               }
+       }
+
+       return 0;
+
+e_inval:
+       return -EINVAL;
+}
+
+static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map)
+{
+       return __decode_primary_temp(p, end, map, false);
+}
+
+static int decode_new_primary_temp(void **p, void *end,
+                                  struct ceph_osdmap *map)
+{
+       return __decode_primary_temp(p, end, map, true);
+}
+
+u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd)
+{
+       BUG_ON(osd >= map->max_osd);
+
+       if (!map->osd_primary_affinity)
+               return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
+
+       return map->osd_primary_affinity[osd];
+}
+
+static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff)
+{
+       BUG_ON(osd >= map->max_osd);
+
+       if (!map->osd_primary_affinity) {
+               int i;
+
+               map->osd_primary_affinity = kmalloc(map->max_osd*sizeof(u32),
+                                                   GFP_NOFS);
+               if (!map->osd_primary_affinity)
+                       return -ENOMEM;
+
+               for (i = 0; i < map->max_osd; i++)
+                       map->osd_primary_affinity[i] =
+                           CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
+       }
+
+       map->osd_primary_affinity[osd] = aff;
+
+       return 0;
+}
+
+static int decode_primary_affinity(void **p, void *end,
+                                  struct ceph_osdmap *map)
+{
+       u32 len, i;
+
+       ceph_decode_32_safe(p, end, len, e_inval);
+       if (len == 0) {
+               kfree(map->osd_primary_affinity);
+               map->osd_primary_affinity = NULL;
+               return 0;
+       }
+       if (len != map->max_osd)
+               goto e_inval;
+
+       ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval);
+
+       for (i = 0; i < map->max_osd; i++) {
+               int ret;
+
+               ret = set_primary_affinity(map, i, ceph_decode_32(p));
+               if (ret)
+                       return ret;
+       }
+
+       return 0;
+
+e_inval:
+       return -EINVAL;
+}
+
+static int decode_new_primary_affinity(void **p, void *end,
+                                      struct ceph_osdmap *map)
+{
+       u32 n;
+
+       ceph_decode_32_safe(p, end, n, e_inval);
+       while (n--) {
+               u32 osd, aff;
+               int ret;
+
+               ceph_decode_32_safe(p, end, osd, e_inval);
+               ceph_decode_32_safe(p, end, aff, e_inval);
+
+               ret = set_primary_affinity(map, osd, aff);
+               if (ret)
+                       return ret;
        }
 
-       ceph_decode_need(p, end, 2*sizeof(u64)+6*sizeof(u32), bad);
+       return 0;
+
+e_inval:
+       return -EINVAL;
+}
+
+/*
+ * decode a full map.
+ */
+static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
+{
+       u8 struct_v;
+       u32 epoch = 0;
+       void *start = *p;
+       u32 max;
+       u32 len, i;
+       int err;
+
+       dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
+
+       err = get_osdmap_client_data_v(p, end, "full", &struct_v);
+       if (err)
+               goto bad;
+
+       /* fsid, epoch, created, modified */
+       ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) +
+                        sizeof(map->created) + sizeof(map->modified), e_inval);
        ceph_decode_copy(p, &map->fsid, sizeof(map->fsid));
-       map->epoch = ceph_decode_32(p);
+       epoch = map->epoch = ceph_decode_32(p);
        ceph_decode_copy(p, &map->created, sizeof(map->created));
        ceph_decode_copy(p, &map->modified, sizeof(map->modified));
 
-       ceph_decode_32_safe(p, end, max, bad);
-       while (max--) {
-               ceph_decode_need(p, end, 8 + 2, bad);
-               err = -ENOMEM;
-               pi = kzalloc(sizeof(*pi), GFP_NOFS);
-               if (!pi)
-                       goto bad;
-               pi->id = ceph_decode_64(p);
-               err = __decode_pool(p, end, pi);
-               if (err < 0) {
-                       kfree(pi);
-                       goto bad;
-               }
-               __insert_pg_pool(&map->pg_pools, pi);
-       }
+       /* pools */
+       err = decode_pools(p, end, map);
+       if (err)
+               goto bad;
 
-       err = __decode_pool_names(p, end, map);
-       if (err < 0) {
-               dout("fail to decode pool names");
+       /* pool_name */
+       err = decode_pool_names(p, end, map);
+       if (err)
                goto bad;
-       }
 
-       ceph_decode_32_safe(p, end, map->pool_max, bad);
+       ceph_decode_32_safe(p, end, map->pool_max, e_inval);
 
-       ceph_decode_32_safe(p, end, map->flags, bad);
+       ceph_decode_32_safe(p, end, map->flags, e_inval);
 
-       max = ceph_decode_32(p);
+       /* max_osd */
+       ceph_decode_32_safe(p, end, max, e_inval);
 
        /* (re)alloc osd arrays */
        err = osdmap_set_max_osd(map, max);
-       if (err < 0)
+       if (err)
                goto bad;
-       dout("osdmap_decode max_osd = %d\n", map->max_osd);
 
-       /* osds */
-       err = -EINVAL;
+       /* osd_state, osd_weight, osd_addrs->client_addr */
        ceph_decode_need(p, end, 3*sizeof(u32) +
                         map->max_osd*(1 + sizeof(*map->osd_weight) +
-                                      sizeof(*map->osd_addr)), bad);
-       *p += 4; /* skip length field (should match max) */
+                                      sizeof(*map->osd_addr)), e_inval);
+
+       if (ceph_decode_32(p) != map->max_osd)
+               goto e_inval;
+
        ceph_decode_copy(p, map->osd_state, map->max_osd);
 
-       *p += 4; /* skip length field (should match max) */
+       if (ceph_decode_32(p) != map->max_osd)
+               goto e_inval;
+
        for (i = 0; i < map->max_osd; i++)
                map->osd_weight[i] = ceph_decode_32(p);
 
-       *p += 4; /* skip length field (should match max) */
+       if (ceph_decode_32(p) != map->max_osd)
+               goto e_inval;
+
        ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr));
        for (i = 0; i < map->max_osd; i++)
                ceph_decode_addr(&map->osd_addr[i]);
 
        /* pg_temp */
-       ceph_decode_32_safe(p, end, len, bad);
-       for (i = 0; i < len; i++) {
-               int n, j;
-               struct ceph_pg pgid;
-               struct ceph_pg_mapping *pg;
+       err = decode_pg_temp(p, end, map);
+       if (err)
+               goto bad;
 
-               err = ceph_decode_pgid(p, end, &pgid);
+       /* primary_temp */
+       if (struct_v >= 1) {
+               err = decode_primary_temp(p, end, map);
                if (err)
                        goto bad;
-               ceph_decode_need(p, end, sizeof(u32), bad);
-               n = ceph_decode_32(p);
-               err = -EINVAL;
-               if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
-                       goto bad;
-               ceph_decode_need(p, end, n * sizeof(u32), bad);
-               err = -ENOMEM;
-               pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS);
-               if (!pg)
-                       goto bad;
-               pg->pgid = pgid;
-               pg->len = n;
-               for (j = 0; j < n; j++)
-                       pg->osds[j] = ceph_decode_32(p);
+       }
 
-               err = __insert_pg_mapping(pg, &map->pg_temp);
+       /* primary_affinity */
+       if (struct_v >= 2) {
+               err = decode_primary_affinity(p, end, map);
                if (err)
                        goto bad;
-               dout(" added pg_temp %lld.%x len %d\n", pgid.pool, pgid.seed,
-                    len);
+       } else {
+               /* XXX can this happen? */
+               kfree(map->osd_primary_affinity);
+               map->osd_primary_affinity = NULL;
        }
 
        /* crush */
-       ceph_decode_32_safe(p, end, len, bad);
-       dout("osdmap_decode crush len %d from off 0x%x\n", len,
-            (int)(*p - start));
-       ceph_decode_need(p, end, len, bad);
-       map->crush = crush_decode(*p, end);
-       *p += len;
+       ceph_decode_32_safe(p, end, len, e_inval);
+       map->crush = crush_decode(*p, min(*p + len, end));
        if (IS_ERR(map->crush)) {
                err = PTR_ERR(map->crush);
                map->crush = NULL;
                goto bad;
        }
+       *p += len;
 
-       /* ignore the rest of the map */
+       /* ignore the rest */
        *p = end;
 
-       dout("osdmap_decode done %p %p\n", *p, end);
-       return map;
+       dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
+       return 0;
 
+e_inval:
+       err = -EINVAL;
 bad:
-       dout("osdmap_decode fail err %d\n", err);
-       ceph_osdmap_destroy(map);
-       return ERR_PTR(err);
+       pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
+              err, epoch, (int)(*p - start), *p, start, end);
+       print_hex_dump(KERN_DEBUG, "osdmap: ",
+                      DUMP_PREFIX_OFFSET, 16, 1,
+                      start, end - start, true);
+       return err;
+}
+
+/*
+ * Allocate and decode a full map.
+ */
+struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end)
+{
+       struct ceph_osdmap *map;
+       int ret;
+
+       map = kzalloc(sizeof(*map), GFP_NOFS);
+       if (!map)
+               return ERR_PTR(-ENOMEM);
+
+       map->pg_temp = RB_ROOT;
+       map->primary_temp = RB_ROOT;
+       mutex_init(&map->crush_scratch_mutex);
+
+       ret = osdmap_decode(p, end, map);
+       if (ret) {
+               ceph_osdmap_destroy(map);
+               return ERR_PTR(ret);
+       }
+
+       return map;
 }
 
 /*
@@ -840,17 +1175,18 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
        __s64 new_pool_max;
        __s32 new_flags, max;
        void *start = *p;
-       int err = -EINVAL;
-       u16 version;
+       int err;
+       u8 struct_v;
+
+       dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
 
-       ceph_decode_16_safe(p, end, version, bad);
-       if (version != 6) {
-               pr_warning("got unknown v %d != 6 of inc osdmap\n", version);
+       err = get_osdmap_client_data_v(p, end, "inc", &struct_v);
+       if (err)
                goto bad;
-       }
 
-       ceph_decode_need(p, end, sizeof(fsid)+sizeof(modified)+2*sizeof(u32),
-                        bad);
+       /* fsid, epoch, modified, new_pool_max, new_flags */
+       ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) +
+                        sizeof(u64) + sizeof(u32), e_inval);
        ceph_decode_copy(p, &fsid, sizeof(fsid));
        epoch = ceph_decode_32(p);
        BUG_ON(epoch != map->epoch+1);
@@ -859,21 +1195,22 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
        new_flags = ceph_decode_32(p);
 
        /* full map? */
-       ceph_decode_32_safe(p, end, len, bad);
+       ceph_decode_32_safe(p, end, len, e_inval);
        if (len > 0) {
                dout("apply_incremental full map len %d, %p to %p\n",
                     len, *p, end);
-               return osdmap_decode(p, min(*p+len, end));
+               return ceph_osdmap_decode(p, min(*p+len, end));
        }
 
        /* new crush? */
-       ceph_decode_32_safe(p, end, len, bad);
+       ceph_decode_32_safe(p, end, len, e_inval);
        if (len > 0) {
-               dout("apply_incremental new crush map len %d, %p to %p\n",
-                    len, *p, end);
                newcrush = crush_decode(*p, min(*p+len, end));
-               if (IS_ERR(newcrush))
-                       return ERR_CAST(newcrush);
+               if (IS_ERR(newcrush)) {
+                       err = PTR_ERR(newcrush);
+                       newcrush = NULL;
+                       goto bad;
+               }
                *p += len;
        }
 
@@ -883,13 +1220,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
        if (new_pool_max >= 0)
                map->pool_max = new_pool_max;
 
-       ceph_decode_need(p, end, 5*sizeof(u32), bad);
-
        /* new max? */
-       max = ceph_decode_32(p);
+       ceph_decode_32_safe(p, end, max, e_inval);
        if (max >= 0) {
                err = osdmap_set_max_osd(map, max);
-               if (err < 0)
+               if (err)
                        goto bad;
        }
 
@@ -902,51 +1237,34 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
                newcrush = NULL;
        }
 
-       /* new_pool */
-       ceph_decode_32_safe(p, end, len, bad);
-       while (len--) {
-               struct ceph_pg_pool_info *pi;
+       /* new_pools */
+       err = decode_new_pools(p, end, map);
+       if (err)
+               goto bad;
 
-               ceph_decode_64_safe(p, end, pool, bad);
-               pi = __lookup_pg_pool(&map->pg_pools, pool);
-               if (!pi) {
-                       pi = kzalloc(sizeof(*pi), GFP_NOFS);
-                       if (!pi) {
-                               err = -ENOMEM;
-                               goto bad;
-                       }
-                       pi->id = pool;
-                       __insert_pg_pool(&map->pg_pools, pi);
-               }
-               err = __decode_pool(p, end, pi);
-               if (err < 0)
-                       goto bad;
-       }
-       if (version >= 5) {
-               err = __decode_pool_names(p, end, map);
-               if (err < 0)
-                       goto bad;
-       }
+       /* new_pool_names */
+       err = decode_pool_names(p, end, map);
+       if (err)
+               goto bad;
 
        /* old_pool */
-       ceph_decode_32_safe(p, end, len, bad);
+       ceph_decode_32_safe(p, end, len, e_inval);
        while (len--) {
                struct ceph_pg_pool_info *pi;
 
-               ceph_decode_64_safe(p, end, pool, bad);
+               ceph_decode_64_safe(p, end, pool, e_inval);
                pi = __lookup_pg_pool(&map->pg_pools, pool);
                if (pi)
                        __remove_pg_pool(&map->pg_pools, pi);
        }
 
        /* new_up */
-       err = -EINVAL;
-       ceph_decode_32_safe(p, end, len, bad);
+       ceph_decode_32_safe(p, end, len, e_inval);
        while (len--) {
                u32 osd;
                struct ceph_entity_addr addr;
-               ceph_decode_32_safe(p, end, osd, bad);
-               ceph_decode_copy_safe(p, end, &addr, sizeof(addr), bad);
+               ceph_decode_32_safe(p, end, osd, e_inval);
+               ceph_decode_copy_safe(p, end, &addr, sizeof(addr), e_inval);
                ceph_decode_addr(&addr);
                pr_info("osd%d up\n", osd);
                BUG_ON(osd >= map->max_osd);
@@ -955,11 +1273,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
        }
 
        /* new_state */
-       ceph_decode_32_safe(p, end, len, bad);
+       ceph_decode_32_safe(p, end, len, e_inval);
        while (len--) {
                u32 osd;
                u8 xorstate;
-               ceph_decode_32_safe(p, end, osd, bad);
+               ceph_decode_32_safe(p, end, osd, e_inval);
                xorstate = **(u8 **)p;
                (*p)++;  /* clean flag */
                if (xorstate == 0)
@@ -971,10 +1289,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
        }
 
        /* new_weight */
-       ceph_decode_32_safe(p, end, len, bad);
+       ceph_decode_32_safe(p, end, len, e_inval);
        while (len--) {
                u32 osd, off;
-               ceph_decode_need(p, end, sizeof(u32)*2, bad);
+               ceph_decode_need(p, end, sizeof(u32)*2, e_inval);
                osd = ceph_decode_32(p);
                off = ceph_decode_32(p);
                pr_info("osd%d weight 0x%x %s\n", osd, off,
@@ -985,56 +1303,35 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
        }
 
        /* new_pg_temp */
-       ceph_decode_32_safe(p, end, len, bad);
-       while (len--) {
-               struct ceph_pg_mapping *pg;
-               int j;
-               struct ceph_pg pgid;
-               u32 pglen;
+       err = decode_new_pg_temp(p, end, map);
+       if (err)
+               goto bad;
 
-               err = ceph_decode_pgid(p, end, &pgid);
+       /* new_primary_temp */
+       if (struct_v >= 1) {
+               err = decode_new_primary_temp(p, end, map);
                if (err)
                        goto bad;
-               ceph_decode_need(p, end, sizeof(u32), bad);
-               pglen = ceph_decode_32(p);
-               if (pglen) {
-                       ceph_decode_need(p, end, pglen*sizeof(u32), bad);
-
-                       /* removing existing (if any) */
-                       (void) __remove_pg_mapping(&map->pg_temp, pgid);
+       }
 
-                       /* insert */
-                       err = -EINVAL;
-                       if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
-                               goto bad;
-                       err = -ENOMEM;
-                       pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
-                       if (!pg)
-                               goto bad;
-                       pg->pgid = pgid;
-                       pg->len = pglen;
-                       for (j = 0; j < pglen; j++)
-                               pg->osds[j] = ceph_decode_32(p);
-                       err = __insert_pg_mapping(pg, &map->pg_temp);
-                       if (err) {
-                               kfree(pg);
-                               goto bad;
-                       }
-                       dout(" added pg_temp %lld.%x len %d\n", pgid.pool,
-                            pgid.seed, pglen);
-               } else {
-                       /* remove */
-                       __remove_pg_mapping(&map->pg_temp, pgid);
-               }
+       /* new_primary_affinity */
+       if (struct_v >= 2) {
+               err = decode_new_primary_affinity(p, end, map);
+               if (err)
+                       goto bad;
        }
 
        /* ignore the rest */
        *p = end;
+
+       dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
        return map;
 
+e_inval:
+       err = -EINVAL;
 bad:
-       pr_err("corrupt inc osdmap epoch %d off %d (%p of %p-%p)\n",
-              epoch, (int)(*p - start), *p, start, end);
+       pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
+              err, epoch, (int)(*p - start), *p, start, end);
        print_hex_dump(KERN_DEBUG, "osdmap: ",
                       DUMP_PREFIX_OFFSET, 16, 1,
                       start, end - start, true);
@@ -1142,14 +1439,20 @@ int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap,
 }
 EXPORT_SYMBOL(ceph_oloc_oid_to_pg);
 
-static int crush_do_rule_ary(const struct crush_map *map, int ruleno, int x,
-                            int *result, int result_max,
-                            const __u32 *weight, int weight_max)
+static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
+                   int *result, int result_max,
+                   const __u32 *weight, int weight_max)
 {
-       int scratch[result_max * 3];
+       int r;
+
+       BUG_ON(result_max > CEPH_PG_MAX_SIZE);
 
-       return crush_do_rule(map, ruleno, x, result, result_max,
-                            weight, weight_max, scratch);
+       mutex_lock(&map->crush_scratch_mutex);
+       r = crush_do_rule(map->crush, ruleno, x, result, result_max,
+                         weight, weight_max, map->crush_scratch_ary);
+       mutex_unlock(&map->crush_scratch_mutex);
+
+       return r;
 }
 
 /*
@@ -1174,8 +1477,8 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
                                    pool->pg_num_mask);
        pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
        if (pg) {
-               *num = pg->len;
-               return pg->osds;
+               *num = pg->pg_temp.len;
+               return pg->pg_temp.osds;
        }
 
        /* crush */
@@ -1205,9 +1508,8 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
                                      pool->pgp_num_mask) +
                        (unsigned)pgid.pool;
        }
-       r = crush_do_rule_ary(osdmap->crush, ruleno, pps,
-                             osds, min_t(int, pool->size, *num),
-                             osdmap->osd_weight, osdmap->max_osd);
+       r = do_crush(osdmap, ruleno, pps, osds, min_t(int, pool->size, *num),
+                    osdmap->osd_weight, osdmap->max_osd);
        if (r < 0) {
                pr_err("error %d from crush rule: pool %lld ruleset %d type %d"
                       " size %d\n", r, pgid.pool, pool->crush_ruleset,
@@ -1219,24 +1521,186 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
 }
 
 /*
- * Return acting set for given pgid.
+ * Calculate raw (crush) set for given pgid.
+ *
+ * Return raw set length, or error.
+ */
+static int pg_to_raw_osds(struct ceph_osdmap *osdmap,
+                         struct ceph_pg_pool_info *pool,
+                         struct ceph_pg pgid, u32 pps, int *osds)
+{
+       int ruleno;
+       int len;
+
+       /* crush */
+       ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset,
+                                pool->type, pool->size);
+       if (ruleno < 0) {
+               pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
+                      pgid.pool, pool->crush_ruleset, pool->type,
+                      pool->size);
+               return -ENOENT;
+       }
+
+       len = do_crush(osdmap, ruleno, pps, osds,
+                      min_t(int, pool->size, CEPH_PG_MAX_SIZE),
+                      osdmap->osd_weight, osdmap->max_osd);
+       if (len < 0) {
+               pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
+                      len, ruleno, pgid.pool, pool->crush_ruleset,
+                      pool->type, pool->size);
+               return len;
+       }
+
+       return len;
+}
+
+/*
+ * Given raw set, calculate up set and up primary.
+ *
+ * Return up set length.  *primary is set to up primary osd id, or -1
+ * if up set is empty.
+ */
+static int raw_to_up_osds(struct ceph_osdmap *osdmap,
+                         struct ceph_pg_pool_info *pool,
+                         int *osds, int len, int *primary)
+{
+       int up_primary = -1;
+       int i;
+
+       if (ceph_can_shift_osds(pool)) {
+               int removed = 0;
+
+               for (i = 0; i < len; i++) {
+                       if (ceph_osd_is_down(osdmap, osds[i])) {
+                               removed++;
+                               continue;
+                       }
+                       if (removed)
+                               osds[i - removed] = osds[i];
+               }
+
+               len -= removed;
+               if (len > 0)
+                       up_primary = osds[0];
+       } else {
+               for (i = len - 1; i >= 0; i--) {
+                       if (ceph_osd_is_down(osdmap, osds[i]))
+                               osds[i] = CRUSH_ITEM_NONE;
+                       else
+                               up_primary = osds[i];
+               }
+       }
+
+       *primary = up_primary;
+       return len;
+}
+
+/*
+ * Given up set, apply pg_temp and primary_temp mappings.
+ *
+ * Return acting set length.  *primary is set to acting primary osd id,
+ * or -1 if acting set is empty.
+ */
+static int apply_temps(struct ceph_osdmap *osdmap,
+                      struct ceph_pg_pool_info *pool, struct ceph_pg pgid,
+                      int *osds, int len, int *primary)
+{
+       struct ceph_pg_mapping *pg;
+       int temp_len;
+       int temp_primary;
+       int i;
+
+       /* raw_pg -> pg */
+       pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num,
+                                   pool->pg_num_mask);
+
+       /* pg_temp? */
+       pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
+       if (pg) {
+               temp_len = 0;
+               temp_primary = -1;
+
+               for (i = 0; i < pg->pg_temp.len; i++) {
+                       if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
+                               if (ceph_can_shift_osds(pool))
+                                       continue;
+                               else
+                                       osds[temp_len++] = CRUSH_ITEM_NONE;
+                       } else {
+                               osds[temp_len++] = pg->pg_temp.osds[i];
+                       }
+               }
+
+               /* apply pg_temp's primary */
+               for (i = 0; i < temp_len; i++) {
+                       if (osds[i] != CRUSH_ITEM_NONE) {
+                               temp_primary = osds[i];
+                               break;
+                       }
+               }
+       } else {
+               temp_len = len;
+               temp_primary = *primary;
+       }
+
+       /* primary_temp? */
+       pg = __lookup_pg_mapping(&osdmap->primary_temp, pgid);
+       if (pg)
+               temp_primary = pg->primary_temp.osd;
+
+       *primary = temp_primary;
+       return temp_len;
+}
+
+/*
+ * Calculate acting set for given pgid.
+ *
+ * Return acting set length, or error.  *primary is set to acting
+ * primary osd id, or -1 if acting set is empty or on error.
  */
 int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
-                       int *acting)
+                       int *osds, int *primary)
 {
-       int rawosds[CEPH_PG_MAX_SIZE], *osds;
-       int i, o, num = CEPH_PG_MAX_SIZE;
+       struct ceph_pg_pool_info *pool;
+       u32 pps;
+       int len;
 
-       osds = calc_pg_raw(osdmap, pgid, rawosds, &num);
-       if (!osds)
-               return -1;
+       pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool);
+       if (!pool) {
+               *primary = -1;
+               return -ENOENT;
+       }
 
-       /* primary is first up osd */
-       o = 0;
-       for (i = 0; i < num; i++)
-               if (ceph_osd_is_up(osdmap, osds[i]))
-                       acting[o++] = osds[i];
-       return o;
+       if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
+               /* hash pool id and seed so that pool PGs do not overlap */
+               pps = crush_hash32_2(CRUSH_HASH_RJENKINS1,
+                                    ceph_stable_mod(pgid.seed, pool->pgp_num,
+                                                    pool->pgp_num_mask),
+                                    pgid.pool);
+       } else {
+               /*
+                * legacy behavior: add ps and pool together.  this is
+                * not a great approach because the PGs from each pool
+                * will overlap on top of each other: 0.5 == 1.4 ==
+                * 2.3 == ...
+                */
+               pps = ceph_stable_mod(pgid.seed, pool->pgp_num,
+                                     pool->pgp_num_mask) +
+                       (unsigned)pgid.pool;
+       }
+
+       len = pg_to_raw_osds(osdmap, pool, pgid, pps, osds);
+       if (len < 0) {
+               *primary = -1;
+               return len;
+       }
+
+       len = raw_to_up_osds(osdmap, pool, osds, len, primary);
+
+       len = apply_temps(osdmap, pool, pgid, osds, len, primary);
+
+       return len;
 }
 
 /*