drbd: Take a more conservative approach when deciding max_bio_size
authorPhilipp Reisner <philipp.reisner@linbit.com>
Fri, 20 May 2011 14:39:13 +0000 (16:39 +0200)
committerPhilipp Reisner <philipp.reisner@linbit.com>
Tue, 24 May 2011 08:08:58 +0000 (10:08 +0200)
The old (optimistic) implementation could shrink the bio size
on an primary device.

Shrinking the bio size on a primary device is bad. Since there
we might get BIOs with the old (bigger) size shortly after
we published the new size.

The new implementation is more conservative, and eventually
increases the max_bio_size on a primary device (which is valid).
It does so, when it knows the local limit AND the remote limit.

 We cache the last seen max_bio_size of the peer in the meta
 data, and rely on that, to make the operation of single
 nodes more efficient.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
drivers/block/drbd/drbd_int.h
drivers/block/drbd/drbd_main.c
drivers/block/drbd/drbd_nl.c
drivers/block/drbd/drbd_receiver.c

index 5c994739d11e3248625e00268d4607f23f2330f0..8aa10391115b0c1c183b50a86ecf6c1518f4c2e9 100644 (file)
@@ -1128,6 +1128,8 @@ struct drbd_conf {
        int rs_in_flight; /* resync sectors in flight (to proxy, in proxy and from proxy) */
        int rs_planed;    /* resync sectors already planned */
        atomic_t ap_in_flight; /* App sectors in flight (waiting for ack) */
+       int peer_max_bio_size;
+       int local_max_bio_size;
 };
 
 static inline struct drbd_conf *minor_to_mdev(unsigned int minor)
@@ -1433,6 +1435,7 @@ struct bm_extent {
  * hash table. */
 #define HT_SHIFT 8
 #define DRBD_MAX_BIO_SIZE (1U<<(9+HT_SHIFT))
+#define DRBD_MAX_BIO_SIZE_SAFE (1 << 12)       /* Works always = 4k */
 
 #define DRBD_MAX_SIZE_H80_PACKET (1 << 15) /* The old header only allows packets up to 32Kib data */
 
@@ -1519,7 +1522,7 @@ extern sector_t drbd_new_dev_size(struct drbd_conf *, struct drbd_backing_dev *,
 enum determine_dev_size { dev_size_error = -1, unchanged = 0, shrunk = 1, grew = 2 };
 extern enum determine_dev_size drbd_determin_dev_size(struct drbd_conf *, enum dds_flags) __must_hold(local);
 extern void resync_after_online_grow(struct drbd_conf *);
-extern void drbd_setup_queue_param(struct drbd_conf *mdev, unsigned int) __must_hold(local);
+extern void drbd_reconsider_max_bio_size(struct drbd_conf *mdev);
 extern enum drbd_state_rv drbd_set_role(struct drbd_conf *mdev,
                                        enum drbd_role new_role,
                                        int force);
index ce6a764e905b73312ab791f9c30939e63859ab54..cfeb13b5a216e9bcf7fc086082ef02afbaf41329 100644 (file)
@@ -2071,7 +2071,7 @@ int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags fl
 {
        struct p_sizes p;
        sector_t d_size, u_size;
-       int q_order_type;
+       int q_order_type, max_bio_size;
        int ok;
 
        if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
@@ -2079,17 +2079,20 @@ int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags fl
                d_size = drbd_get_max_capacity(mdev->ldev);
                u_size = mdev->ldev->dc.disk_size;
                q_order_type = drbd_queue_order_type(mdev);
+               max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
+               max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
                put_ldev(mdev);
        } else {
                d_size = 0;
                u_size = 0;
                q_order_type = QUEUE_ORDERED_NONE;
+               max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
        }
 
        p.d_size = cpu_to_be64(d_size);
        p.u_size = cpu_to_be64(u_size);
        p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
-       p.max_bio_size = cpu_to_be32(queue_max_hw_sectors(mdev->rq_queue) << 9);
+       p.max_bio_size = cpu_to_be32(max_bio_size);
        p.queue_order_type = cpu_to_be16(q_order_type);
        p.dds_flags = cpu_to_be16(flags);
 
@@ -3048,6 +3051,8 @@ void drbd_init_set_defaults(struct drbd_conf *mdev)
        mdev->agreed_pro_version = PRO_VERSION_MAX;
        mdev->write_ordering = WO_bdev_flush;
        mdev->resync_wenr = LC_FREE;
+       mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
+       mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
 }
 
 void drbd_mdev_cleanup(struct drbd_conf *mdev)
@@ -3422,7 +3427,9 @@ struct drbd_conf *drbd_new_device(unsigned int minor)
        q->backing_dev_info.congested_data = mdev;
 
        blk_queue_make_request(q, drbd_make_request);
-       blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE >> 9);
+       /* Setting the max_hw_sectors to an odd value of 8kibyte here
+          This triggers a max_bio_size message upon first attach or connect */
+       blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
        blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
        blk_queue_merge_bvec(q, drbd_merge_bvec);
        q->queue_lock = &mdev->req_lock;
@@ -3634,7 +3641,8 @@ struct meta_data_on_disk {
              /* `-- act_log->nr_elements <-- sync_conf.al_extents */
        u32 bm_offset;         /* offset to the bitmap, from here */
        u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
-       u32 reserved_u32[4];
+       u32 la_peer_max_bio_size;   /* last peer max_bio_size */
+       u32 reserved_u32[3];
 
 } __packed;
 
@@ -3675,6 +3683,7 @@ void drbd_md_sync(struct drbd_conf *mdev)
        buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
 
        buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
+       buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
 
        D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
        sector = mdev->ldev->md.md_offset;
@@ -3758,6 +3767,15 @@ int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
        mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
        bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
 
+       spin_lock_irq(&mdev->req_lock);
+       if (mdev->state.conn < C_CONNECTED) {
+               int peer;
+               peer = be32_to_cpu(buffer->la_peer_max_bio_size);
+               peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
+               mdev->peer_max_bio_size = peer;
+       }
+       spin_unlock_irq(&mdev->req_lock);
+
        if (mdev->sync_conf.al_extents < 7)
                mdev->sync_conf.al_extents = 127;
 
index 9dfe58a096252dc711d75dbc29f693278ddba9b4..7c64ec042124a117e3bd97c636d0d938caa42ca5 100644 (file)
@@ -278,8 +278,14 @@ static int _try_outdate_peer_async(void *data)
 
        /* Not using
           drbd_request_state(mdev, NS(pdsk, nps));
-          here, because we might were able to re-establish the connection in the
-          meantime.
+          here, because we might were able to re-establish the connection
+          in the meantime. This can only partially be solved in the state's
+          engine is_valid_state() and is_valid_state_transition()
+          functions.
+
+          nps can be D_INCONSISTENT, D_OUTDATED or D_UNKNOWN.
+          pdsk == D_INCONSISTENT while conn >= C_CONNECTED is valid,
+          therefore we have to have the pre state change check here.
        */
        spin_lock_irq(&mdev->req_lock);
        ns = mdev->state;
@@ -786,30 +792,78 @@ static int drbd_check_al_size(struct drbd_conf *mdev)
        return 0;
 }
 
-void drbd_setup_queue_param(struct drbd_conf *mdev, unsigned int max_bio_size) __must_hold(local)
+static void drbd_setup_queue_param(struct drbd_conf *mdev, unsigned int max_bio_size)
 {
        struct request_queue * const q = mdev->rq_queue;
-       struct request_queue * const b = mdev->ldev->backing_bdev->bd_disk->queue;
-       int max_segments = mdev->ldev->dc.max_bio_bvecs;
-       int max_hw_sectors = min(queue_max_hw_sectors(b), max_bio_size >> 9);
+       int max_hw_sectors = max_bio_size >> 9;
+       int max_segments = 0;
+
+       if (get_ldev_if_state(mdev, D_ATTACHING)) {
+               struct request_queue * const b = mdev->ldev->backing_bdev->bd_disk->queue;
+
+               max_hw_sectors = min(queue_max_hw_sectors(b), max_bio_size >> 9);
+               max_segments = mdev->ldev->dc.max_bio_bvecs;
+               put_ldev(mdev);
+       }
 
        blk_queue_logical_block_size(q, 512);
        blk_queue_max_hw_sectors(q, max_hw_sectors);
        /* This is the workaround for "bio would need to, but cannot, be split" */
        blk_queue_max_segments(q, max_segments ? max_segments : BLK_MAX_SEGMENTS);
        blk_queue_segment_boundary(q, PAGE_CACHE_SIZE-1);
-       blk_queue_stack_limits(q, b);
 
-       dev_info(DEV, "max BIO size = %u\n", queue_max_hw_sectors(q) << 9);
+       if (get_ldev_if_state(mdev, D_ATTACHING)) {
+               struct request_queue * const b = mdev->ldev->backing_bdev->bd_disk->queue;
 
-       if (q->backing_dev_info.ra_pages != b->backing_dev_info.ra_pages) {
-               dev_info(DEV, "Adjusting my ra_pages to backing device's (%lu -> %lu)\n",
-                    q->backing_dev_info.ra_pages,
-                    b->backing_dev_info.ra_pages);
-               q->backing_dev_info.ra_pages = b->backing_dev_info.ra_pages;
+               blk_queue_stack_limits(q, b);
+
+               if (q->backing_dev_info.ra_pages != b->backing_dev_info.ra_pages) {
+                       dev_info(DEV, "Adjusting my ra_pages to backing device's (%lu -> %lu)\n",
+                                q->backing_dev_info.ra_pages,
+                                b->backing_dev_info.ra_pages);
+                       q->backing_dev_info.ra_pages = b->backing_dev_info.ra_pages;
+               }
+               put_ldev(mdev);
        }
 }
 
+void drbd_reconsider_max_bio_size(struct drbd_conf *mdev)
+{
+       int now, new, local, peer;
+
+       now = queue_max_hw_sectors(mdev->rq_queue) << 9;
+       local = mdev->local_max_bio_size; /* Eventually last known value, from volatile memory */
+       peer = mdev->peer_max_bio_size; /* Eventually last known value, from meta data */
+
+       if (get_ldev_if_state(mdev, D_ATTACHING)) {
+               local = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
+               mdev->local_max_bio_size = local;
+               put_ldev(mdev);
+       }
+
+       /* We may ignore peer limits if the peer is modern enough.
+          Because new from 8.3.8 onwards the peer can use multiple
+          BIOs for a single peer_request */
+       if (mdev->state.conn >= C_CONNECTED) {
+               if (mdev->agreed_pro_version < 94)
+                       peer = mdev->peer_max_bio_size;
+               else if (mdev->agreed_pro_version == 94)
+                       peer = DRBD_MAX_SIZE_H80_PACKET;
+               else /* drbd 8.3.8 onwards */
+                       peer = DRBD_MAX_BIO_SIZE;
+       }
+
+       new = min_t(int, local, peer);
+
+       if (mdev->state.role == R_PRIMARY && new < now)
+               dev_err(DEV, "ASSERT FAILED new < now; (%d < %d)\n", new, now);
+
+       if (new != now)
+               dev_info(DEV, "max BIO size = %u\n", new);
+
+       drbd_setup_queue_param(mdev, new);
+}
+
 /* serialize deconfig (worker exiting, doing cleanup)
  * and reconfig (drbdsetup disk, drbdsetup net)
  *
@@ -878,7 +932,6 @@ static int drbd_nl_disk_conf(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp
        struct block_device *bdev;
        struct lru_cache *resync_lru = NULL;
        union drbd_state ns, os;
-       unsigned int max_bio_size;
        enum drbd_state_rv rv;
        int cp_discovered = 0;
        int logical_block_size;
@@ -1130,20 +1183,7 @@ static int drbd_nl_disk_conf(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp
        mdev->read_cnt = 0;
        mdev->writ_cnt = 0;
 
-       max_bio_size = DRBD_MAX_BIO_SIZE;
-       if (mdev->state.conn == C_CONNECTED) {
-               /* We are Primary, Connected, and now attach a new local
-                * backing store. We must not increase the user visible maximum
-                * bio size on this device to something the peer may not be
-                * able to handle. */
-               if (mdev->agreed_pro_version < 94)
-                       max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
-               else if (mdev->agreed_pro_version == 94)
-                       max_bio_size = DRBD_MAX_SIZE_H80_PACKET;
-               /* else: drbd 8.3.9 and later, stay with default */
-       }
-
-       drbd_setup_queue_param(mdev, max_bio_size);
+       drbd_reconsider_max_bio_size(mdev);
 
        /* If I am currently not R_PRIMARY,
         * but meta data primary indicator is set,
index b0b0ba345e83455e495f0430e17b6c245b10704a..6ea0a4b51ecee87aed144b3e4600453fcaa19a52 100644 (file)
@@ -899,11 +899,6 @@ retry:
 
        drbd_thread_start(&mdev->asender);
 
-       if (mdev->agreed_pro_version < 95 && get_ldev(mdev)) {
-               drbd_setup_queue_param(mdev, DRBD_MAX_SIZE_H80_PACKET);
-               put_ldev(mdev);
-       }
-
        if (drbd_send_protocol(mdev) == -1)
                return -1;
        drbd_send_sync_param(mdev, &mdev->sync_conf);
@@ -2939,7 +2934,6 @@ static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned
 {
        struct p_sizes *p = &mdev->data.rbuf.sizes;
        enum determine_dev_size dd = unchanged;
-       unsigned int max_bio_size;
        sector_t p_size, p_usize, my_usize;
        int ldsc = 0; /* local disk size changed */
        enum dds_flags ddsf;
@@ -3004,23 +2998,15 @@ static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned
                drbd_set_my_capacity(mdev, p_size);
        }
 
+       mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
+       drbd_reconsider_max_bio_size(mdev);
+
        if (get_ldev(mdev)) {
                if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
                        mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
                        ldsc = 1;
                }
 
-               if (mdev->agreed_pro_version < 94)
-                       max_bio_size = be32_to_cpu(p->max_bio_size);
-               else if (mdev->agreed_pro_version == 94)
-                       max_bio_size = DRBD_MAX_SIZE_H80_PACKET;
-               else /* drbd 8.3.8 onwards */
-                       max_bio_size = DRBD_MAX_BIO_SIZE;
-
-               if (max_bio_size != queue_max_hw_sectors(mdev->rq_queue) << 9)
-                       drbd_setup_queue_param(mdev, max_bio_size);
-
-               drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
                put_ldev(mdev);
        }