Merge tag 'rdma-for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/roland...
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
index 68e3992e88381cd4974ebfa2da3400708ab4afa0..b6c8aaf4931bc8434635e74efa004f9ddf8c0304 100644 (file)
 #include "drbd_int.h"
 #include "drbd_protocol.h"
 #include "drbd_req.h"
-
 #include "drbd_vli.h"
 
+#define PRO_FEATURES (FF_TRIM)
+
 struct packet_info {
        enum drbd_packet cmd;
        unsigned int size;
@@ -65,7 +66,7 @@ enum finish_epoch {
 static int drbd_do_features(struct drbd_connection *connection);
 static int drbd_do_auth(struct drbd_connection *connection);
 static int drbd_disconnected(struct drbd_peer_device *);
-
+static void conn_wait_active_ee_empty(struct drbd_connection *connection);
 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
 static int e_end_block(struct drbd_work *, int);
 
@@ -234,9 +235,17 @@ static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
  * @retry:     whether to retry, if not enough pages are available right now
  *
  * Tries to allocate number pages, first from our own page pool, then from
- * the kernel, unless this allocation would exceed the max_buffers setting.
+ * the kernel.
  * Possibly retry until DRBD frees sufficient pages somewhere else.
  *
+ * If this allocation would exceed the max_buffers setting, we throttle
+ * allocation (schedule_timeout) to give the system some room to breathe.
+ *
+ * We do not use max-buffers as hard limit, because it could lead to
+ * congestion and further to a distributed deadlock during online-verify or
+ * (checksum based) resync, if the max-buffers, socket buffer sizes and
+ * resync-rate settings are mis-configured.
+ *
  * Returns a page chain linked via page->private.
  */
 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
@@ -246,10 +255,8 @@ struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int
        struct page *page = NULL;
        struct net_conf *nc;
        DEFINE_WAIT(wait);
-       int mxb;
+       unsigned int mxb;
 
-       /* Yes, we may run up to @number over max_buffers. If we
-        * follow it strictly, the admin will get it wrong anyways. */
        rcu_read_lock();
        nc = rcu_dereference(peer_device->connection->net_conf);
        mxb = nc ? nc->max_buffers : 1000000;
@@ -277,7 +284,8 @@ struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int
                        break;
                }
 
-               schedule();
+               if (schedule_timeout(HZ/10) == 0)
+                       mxb = UINT_MAX;
        }
        finish_wait(&drbd_pp_wait, &wait);
 
@@ -331,7 +339,7 @@ You must not have the req_lock:
 
 struct drbd_peer_request *
 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
-                   unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
+                   unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
 {
        struct drbd_device *device = peer_device->device;
        struct drbd_peer_request *peer_req;
@@ -348,7 +356,7 @@ drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t secto
                return NULL;
        }
 
-       if (data_size) {
+       if (has_payload && data_size) {
                page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
                if (!page)
                        goto fail;
@@ -1026,24 +1034,27 @@ randomize:
        if (drbd_send_protocol(connection) == -EOPNOTSUPP)
                return -1;
 
+       /* Prevent a race between resync-handshake and
+        * being promoted to Primary.
+        *
+        * Grab and release the state mutex, so we know that any current
+        * drbd_set_role() is finished, and any incoming drbd_set_role
+        * will see the STATE_SENT flag, and wait for it to be cleared.
+        */
+       idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
+               mutex_lock(peer_device->device->state_mutex);
+
        set_bit(STATE_SENT, &connection->flags);
 
+       idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
+               mutex_unlock(peer_device->device->state_mutex);
+
        rcu_read_lock();
        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
                struct drbd_device *device = peer_device->device;
                kref_get(&device->kref);
                rcu_read_unlock();
 
-               /* Prevent a race between resync-handshake and
-                * being promoted to Primary.
-                *
-                * Grab and release the state mutex, so we know that any current
-                * drbd_set_role() is finished, and any incoming drbd_set_role
-                * will see the STATE_SENT flag, and wait for it to be cleared.
-                */
-               mutex_lock(device->state_mutex);
-               mutex_unlock(device->state_mutex);
-
                if (discard_my_data)
                        set_bit(DISCARD_MY_DATA, &device->flags);
                else
@@ -1315,6 +1326,20 @@ int drbd_submit_peer_request(struct drbd_device *device,
        unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
        int err = -ENOMEM;
 
+       if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
+               /* wait for all pending IO completions, before we start
+                * zeroing things out. */
+               conn_wait_active_ee_empty(first_peer_device(device)->connection);
+               if (blkdev_issue_zeroout(device->ldev->backing_bdev,
+                       sector, ds >> 9, GFP_NOIO))
+                       peer_req->flags |= EE_WAS_ERROR;
+               drbd_endio_write_sec_final(peer_req);
+               return 0;
+       }
+
+       if (peer_req->flags & EE_IS_TRIM)
+               nr_pages = 0; /* discards don't have any payload. */
+
        /* In most cases, we will only need one bio.  But in case the lower
         * level restrictions happen to be different at this offset on this
         * side than those of the sending peer, we may need to submit the
@@ -1326,7 +1351,7 @@ int drbd_submit_peer_request(struct drbd_device *device,
 next_bio:
        bio = bio_alloc(GFP_NOIO, nr_pages);
        if (!bio) {
-               drbd_err(device, "submit_ee: Allocation of a bio failed\n");
+               drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
                goto fail;
        }
        /* > peer_req->i.sector, unless this is the first bio */
@@ -1340,6 +1365,11 @@ next_bio:
        bios = bio;
        ++n_bios;
 
+       if (rw & REQ_DISCARD) {
+               bio->bi_iter.bi_size = ds;
+               goto submit;
+       }
+
        page_chain_for_each(page) {
                unsigned len = min_t(unsigned, ds, PAGE_SIZE);
                if (!bio_add_page(bio, page, len, 0)) {
@@ -1360,8 +1390,9 @@ next_bio:
                sector += len >> 9;
                --nr_pages;
        }
-       D_ASSERT(device, page == NULL);
        D_ASSERT(device, ds == 0);
+submit:
+       D_ASSERT(device, page == NULL);
 
        atomic_set(&peer_req->pending_bios, n_bios);
        do {
@@ -1490,19 +1521,21 @@ static int receive_Barrier(struct drbd_connection *connection, struct packet_inf
  * and from receive_Data */
 static struct drbd_peer_request *
 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
-             int data_size) __must_hold(local)
+             struct packet_info *pi) __must_hold(local)
 {
        struct drbd_device *device = peer_device->device;
        const sector_t capacity = drbd_get_capacity(device->this_bdev);
        struct drbd_peer_request *peer_req;
        struct page *page;
        int dgs, ds, err;
+       int data_size = pi->size;
        void *dig_in = peer_device->connection->int_dig_in;
        void *dig_vv = peer_device->connection->int_dig_vv;
        unsigned long *data;
+       struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
 
        dgs = 0;
-       if (peer_device->connection->peer_integrity_tfm) {
+       if (!trim && peer_device->connection->peer_integrity_tfm) {
                dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
                /*
                 * FIXME: Receive the incoming digest into the receive buffer
@@ -1514,9 +1547,15 @@ read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
                data_size -= dgs;
        }
 
+       if (trim) {
+               D_ASSERT(peer_device, data_size == 0);
+               data_size = be32_to_cpu(trim->size);
+       }
+
        if (!expect(IS_ALIGNED(data_size, 512)))
                return NULL;
-       if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
+       /* prepare for larger trim requests. */
+       if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
                return NULL;
 
        /* even though we trust out peer,
@@ -1532,11 +1571,11 @@ read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
         * "criss-cross" setup, that might cause write-out on some other DRBD,
         * which in turn might block on the other node at this very place.  */
-       peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, GFP_NOIO);
+       peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
        if (!peer_req)
                return NULL;
 
-       if (!data_size)
+       if (trim)
                return peer_req;
 
        ds = data_size;
@@ -1676,12 +1715,12 @@ static int e_end_resync_block(struct drbd_work *w, int unused)
 }
 
 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
-                           int data_size) __releases(local)
+                           struct packet_info *pi) __releases(local)
 {
        struct drbd_device *device = peer_device->device;
        struct drbd_peer_request *peer_req;
 
-       peer_req = read_in_block(peer_device, ID_SYNCER, sector, data_size);
+       peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
        if (!peer_req)
                goto fail;
 
@@ -1697,7 +1736,7 @@ static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t secto
        list_add(&peer_req->w.list, &device->sync_ee);
        spin_unlock_irq(&device->resource->req_lock);
 
-       atomic_add(data_size >> 9, &device->rs_sect_ev);
+       atomic_add(pi->size >> 9, &device->rs_sect_ev);
        if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
                return 0;
 
@@ -1785,7 +1824,7 @@ static int receive_RSDataReply(struct drbd_connection *connection, struct packet
                /* data is submitted to disk within recv_resync_read.
                 * corresponding put_ldev done below on error,
                 * or in drbd_peer_request_endio. */
-               err = recv_resync_read(peer_device, sector, pi->size);
+               err = recv_resync_read(peer_device, sector, pi);
        } else {
                if (__ratelimit(&drbd_ratelimit_state))
                        drbd_err(device, "Can not write resync data to local disk.\n");
@@ -2196,7 +2235,7 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info *
         */
 
        sector = be64_to_cpu(p->sector);
-       peer_req = read_in_block(peer_device, p->block_id, sector, pi->size);
+       peer_req = read_in_block(peer_device, p->block_id, sector, pi);
        if (!peer_req) {
                put_ldev(device);
                return -EIO;
@@ -2206,7 +2245,15 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info *
 
        dp_flags = be32_to_cpu(p->dp_flags);
        rw |= wire_flags_to_bio(dp_flags);
-       if (peer_req->pages == NULL) {
+       if (pi->cmd == P_TRIM) {
+               struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
+               peer_req->flags |= EE_IS_TRIM;
+               if (!blk_queue_discard(q))
+                       peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
+               D_ASSERT(peer_device, peer_req->i.size > 0);
+               D_ASSERT(peer_device, rw & REQ_DISCARD);
+               D_ASSERT(peer_device, peer_req->pages == NULL);
+       } else if (peer_req->pages == NULL) {
                D_ASSERT(device, peer_req->i.size == 0);
                D_ASSERT(device, dp_flags & DP_FLUSH);
        }
@@ -2242,7 +2289,12 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info *
                update_peer_seq(peer_device, peer_seq);
                spin_lock_irq(&device->resource->req_lock);
        }
-       list_add(&peer_req->w.list, &device->active_ee);
+       /* if we use the zeroout fallback code, we process synchronously
+        * and we wait for all pending requests, respectively wait for
+        * active_ee to become empty in drbd_submit_peer_request();
+        * better not add ourselves here. */
+       if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
+               list_add(&peer_req->w.list, &device->active_ee);
        spin_unlock_irq(&device->resource->req_lock);
 
        if (device->state.conn == C_SYNC_TARGET)
@@ -2313,39 +2365,45 @@ out_interrupted:
  * The current sync rate used here uses only the most recent two step marks,
  * to have a short time average so we can react faster.
  */
-int drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
+bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
 {
-       struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
-       unsigned long db, dt, dbdt;
        struct lc_element *tmp;
-       int curr_events;
-       int throttle = 0;
-       unsigned int c_min_rate;
-
-       rcu_read_lock();
-       c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
-       rcu_read_unlock();
+       bool throttle = true;
 
-       /* feature disabled? */
-       if (c_min_rate == 0)
-               return 0;
+       if (!drbd_rs_c_min_rate_throttle(device))
+               return false;
 
        spin_lock_irq(&device->al_lock);
        tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
        if (tmp) {
                struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
-               if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
-                       spin_unlock_irq(&device->al_lock);
-                       return 0;
-               }
+               if (test_bit(BME_PRIORITY, &bm_ext->flags))
+                       throttle = false;
                /* Do not slow down if app IO is already waiting for this extent */
        }
        spin_unlock_irq(&device->al_lock);
 
+       return throttle;
+}
+
+bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
+{
+       struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
+       unsigned long db, dt, dbdt;
+       unsigned int c_min_rate;
+       int curr_events;
+
+       rcu_read_lock();
+       c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
+       rcu_read_unlock();
+
+       /* feature disabled? */
+       if (c_min_rate == 0)
+               return false;
+
        curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
                      (int)part_stat_read(&disk->part0, sectors[1]) -
                        atomic_read(&device->rs_sect_ev);
-
        if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
                unsigned long rs_left;
                int i;
@@ -2368,12 +2426,11 @@ int drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
                dbdt = Bit2KB(db/dt);
 
                if (dbdt > c_min_rate)
-                       throttle = 1;
+                       return true;
        }
-       return throttle;
+       return false;
 }
 
-
 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
 {
        struct drbd_peer_device *peer_device;
@@ -2436,7 +2493,8 @@ static int receive_DataRequest(struct drbd_connection *connection, struct packet
        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
         * "criss-cross" setup, that might cause write-out on some other DRBD,
         * which in turn might block on the other node at this very place.  */
-       peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size, GFP_NOIO);
+       peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
+                       true /* has real payload */, GFP_NOIO);
        if (!peer_req) {
                put_ldev(device);
                return -ENOMEM;
@@ -3648,6 +3706,13 @@ static int receive_sizes(struct drbd_connection *connection, struct packet_info
                put_ldev(device);
        }
 
+       device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
+       drbd_reconsider_max_bio_size(device);
+       /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
+          In case we cleared the QUEUE_FLAG_DISCARD from our queue in
+          drbd_reconsider_max_bio_size(), we can be sure that after
+          drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
+
        ddsf = be16_to_cpu(p->dds_flags);
        if (get_ldev(device)) {
                dd = drbd_determine_dev_size(device, ddsf, NULL);
@@ -3660,9 +3725,6 @@ static int receive_sizes(struct drbd_connection *connection, struct packet_info
                drbd_set_my_capacity(device, p_size);
        }
 
-       device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
-       drbd_reconsider_max_bio_size(device);
-
        if (get_ldev(device)) {
                if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
                        device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
@@ -4423,6 +4485,7 @@ static struct data_cmd drbd_cmd_handler[] = {
        [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
        [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
        [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
+       [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
 };
 
 static void drbdd(struct drbd_connection *connection)
@@ -4630,6 +4693,7 @@ static int drbd_send_features(struct drbd_connection *connection)
        memset(p, 0, sizeof(*p));
        p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
        p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
+       p->feature_flags = cpu_to_be32(PRO_FEATURES);
        return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
 }
 
@@ -4683,10 +4747,14 @@ static int drbd_do_features(struct drbd_connection *connection)
                goto incompat;
 
        connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
+       connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
 
        drbd_info(connection, "Handshake successful: "
             "Agreed network protocol version %d\n", connection->agreed_pro_version);
 
+       drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
+                 connection->agreed_features & FF_TRIM ? " " : " not ");
+
        return 1;
 
  incompat:
@@ -4778,6 +4846,12 @@ static int drbd_do_auth(struct drbd_connection *connection)
                goto fail;
        }
 
+       if (pi.size < CHALLENGE_LEN) {
+               drbd_err(connection, "AuthChallenge payload too small.\n");
+               rv = -1;
+               goto fail;
+       }
+
        peers_ch = kmalloc(pi.size, GFP_NOIO);
        if (peers_ch == NULL) {
                drbd_err(connection, "kmalloc of peers_ch failed\n");
@@ -4791,6 +4865,12 @@ static int drbd_do_auth(struct drbd_connection *connection)
                goto fail;
        }
 
+       if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
+               drbd_err(connection, "Peer presented the same challenge!\n");
+               rv = -1;
+               goto fail;
+       }
+
        resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
        response = kmalloc(resp_size, GFP_NOIO);
        if (response == NULL) {