rbd: rework rbd_request_fn()
authorIlya Dryomov <ilya.dryomov@inktank.com>
Mon, 4 Aug 2014 14:04:39 +0000 (18:04 +0400)
committerIlya Dryomov <ilya.dryomov@inktank.com>
Thu, 7 Aug 2014 10:56:20 +0000 (14:56 +0400)
While it was never a good idea to sleep in request_fn(), commit
34c6bc2c919a ("locking/mutexes: Add extra reschedule point") made it
a *bad* idea.  mutex_lock() since 3.15 may reschedule *before* putting
task on the mutex wait queue, which for tasks in !TASK_RUNNING state
means block forever.  request_fn() may be called with !TASK_RUNNING on
the way to schedule() in io_schedule().

Offload request handling to a workqueue, one per rbd device, to avoid
calling blocking primitives from rbd_request_fn().

Fixes: http://tracker.ceph.com/issues/8818
Cc: stable@vger.kernel.org # 3.16, needs backporting for 3.15
Signed-off-by: Ilya Dryomov <ilya.dryomov@inktank.com>
Tested-by: Eric Eastman <eric0e@aol.com>
Tested-by: Greg Wilson <greg.wilson@keepertech.com>
Reviewed-by: Alex Elder <elder@linaro.org>
drivers/block/rbd.c

index cbc89fa9a6771fa7b48f42de1985478cf1af10eb..4515b128d0b48e02cf6fe961026682c38d275f9e 100644 (file)
@@ -42,6 +42,7 @@
 #include <linux/blkdev.h>
 #include <linux/slab.h>
 #include <linux/idr.h>
+#include <linux/workqueue.h>
 
 #include "rbd_types.h"
 
@@ -332,7 +333,10 @@ struct rbd_device {
 
        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 
+       struct list_head        rq_queue;       /* incoming rq queue */
        spinlock_t              lock;           /* queue, flags, open_count */
+       struct workqueue_struct *rq_wq;
+       struct work_struct      rq_work;
 
        struct rbd_image_header header;
        unsigned long           flags;          /* possibly lock protected */
@@ -3176,102 +3180,129 @@ out:
        return ret;
 }
 
-static void rbd_request_fn(struct request_queue *q)
-               __releases(q->queue_lock) __acquires(q->queue_lock)
+static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
 {
-       struct rbd_device *rbd_dev = q->queuedata;
-       struct request *rq;
+       struct rbd_img_request *img_request;
+       u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
+       u64 length = blk_rq_bytes(rq);
+       bool wr = rq_data_dir(rq) == WRITE;
        int result;
 
-       while ((rq = blk_fetch_request(q))) {
-               bool write_request = rq_data_dir(rq) == WRITE;
-               struct rbd_img_request *img_request;
-               u64 offset;
-               u64 length;
+       /* Ignore/skip any zero-length requests */
 
-               /* Ignore any non-FS requests that filter through. */
+       if (!length) {
+               dout("%s: zero-length request\n", __func__);
+               result = 0;
+               goto err_rq;
+       }
 
-               if (rq->cmd_type != REQ_TYPE_FS) {
-                       dout("%s: non-fs request type %d\n", __func__,
-                               (int) rq->cmd_type);
-                       __blk_end_request_all(rq, 0);
-                       continue;
+       /* Disallow writes to a read-only device */
+
+       if (wr) {
+               if (rbd_dev->mapping.read_only) {
+                       result = -EROFS;
+                       goto err_rq;
                }
+               rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
+       }
 
-               /* Ignore/skip any zero-length requests */
+       /*
+        * Quit early if the mapped snapshot no longer exists.  It's
+        * still possible the snapshot will have disappeared by the
+        * time our request arrives at the osd, but there's no sense in
+        * sending it if we already know.
+        */
+       if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
+               dout("request for non-existent snapshot");
+               rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
+               result = -ENXIO;
+               goto err_rq;
+       }
 
-               offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
-               length = (u64) blk_rq_bytes(rq);
+       if (offset && length > U64_MAX - offset + 1) {
+               rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
+                        length);
+               result = -EINVAL;
+               goto err_rq;    /* Shouldn't happen */
+       }
 
-               if (!length) {
-                       dout("%s: zero-length request\n", __func__);
-                       __blk_end_request_all(rq, 0);
-                       continue;
-               }
+       if (offset + length > rbd_dev->mapping.size) {
+               rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
+                        length, rbd_dev->mapping.size);
+               result = -EIO;
+               goto err_rq;
+       }
 
-               spin_unlock_irq(q->queue_lock);
+       img_request = rbd_img_request_create(rbd_dev, offset, length, wr);
+       if (!img_request) {
+               result = -ENOMEM;
+               goto err_rq;
+       }
+       img_request->rq = rq;
 
-               /* Disallow writes to a read-only device */
+       result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, rq->bio);
+       if (result)
+               goto err_img_request;
 
-               if (write_request) {
-                       result = -EROFS;
-                       if (rbd_dev->mapping.read_only)
-                               goto end_request;
-                       rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
-               }
+       result = rbd_img_request_submit(img_request);
+       if (result)
+               goto err_img_request;
 
-               /*
-                * Quit early if the mapped snapshot no longer
-                * exists.  It's still possible the snapshot will
-                * have disappeared by the time our request arrives
-                * at the osd, but there's no sense in sending it if
-                * we already know.
-                */
-               if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
-                       dout("request for non-existent snapshot");
-                       rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
-                       result = -ENXIO;
-                       goto end_request;
-               }
+       return;
 
-               result = -EINVAL;
-               if (offset && length > U64_MAX - offset + 1) {
-                       rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
-                               offset, length);
-                       goto end_request;       /* Shouldn't happen */
-               }
+err_img_request:
+       rbd_img_request_put(img_request);
+err_rq:
+       if (result)
+               rbd_warn(rbd_dev, "%s %llx at %llx result %d",
+                        wr ? "write" : "read", length, offset, result);
+       blk_end_request_all(rq, result);
+}
 
-               result = -EIO;
-               if (offset + length > rbd_dev->mapping.size) {
-                       rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
-                               offset, length, rbd_dev->mapping.size);
-                       goto end_request;
-               }
+static void rbd_request_workfn(struct work_struct *work)
+{
+       struct rbd_device *rbd_dev =
+           container_of(work, struct rbd_device, rq_work);
+       struct request *rq, *next;
+       LIST_HEAD(requests);
 
-               result = -ENOMEM;
-               img_request = rbd_img_request_create(rbd_dev, offset, length,
-                                                       write_request);
-               if (!img_request)
-                       goto end_request;
+       spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
+       list_splice_init(&rbd_dev->rq_queue, &requests);
+       spin_unlock_irq(&rbd_dev->lock);
 
-               img_request->rq = rq;
+       list_for_each_entry_safe(rq, next, &requests, queuelist) {
+               list_del_init(&rq->queuelist);
+               rbd_handle_request(rbd_dev, rq);
+       }
+}
 
-               result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
-                                               rq->bio);
-               if (!result)
-                       result = rbd_img_request_submit(img_request);
-               if (result)
-                       rbd_img_request_put(img_request);
-end_request:
-               spin_lock_irq(q->queue_lock);
-               if (result < 0) {
-                       rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
-                               write_request ? "write" : "read",
-                               length, offset, result);
-
-                       __blk_end_request_all(rq, result);
+/*
+ * Called with q->queue_lock held and interrupts disabled, possibly on
+ * the way to schedule().  Do not sleep here!
+ */
+static void rbd_request_fn(struct request_queue *q)
+{
+       struct rbd_device *rbd_dev = q->queuedata;
+       struct request *rq;
+       int queued = 0;
+
+       rbd_assert(rbd_dev);
+
+       while ((rq = blk_fetch_request(q))) {
+               /* Ignore any non-FS requests that filter through. */
+               if (rq->cmd_type != REQ_TYPE_FS) {
+                       dout("%s: non-fs request type %d\n", __func__,
+                               (int) rq->cmd_type);
+                       __blk_end_request_all(rq, 0);
+                       continue;
                }
+
+               list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
+               queued++;
        }
+
+       if (queued)
+               queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work);
 }
 
 /*
@@ -3847,6 +3878,8 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
                return NULL;
 
        spin_lock_init(&rbd_dev->lock);
+       INIT_LIST_HEAD(&rbd_dev->rq_queue);
+       INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
        rbd_dev->flags = 0;
        atomic_set(&rbd_dev->parent_ref, 0);
        INIT_LIST_HEAD(&rbd_dev->node);
@@ -5051,12 +5084,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
        ret = rbd_dev_mapping_set(rbd_dev);
        if (ret)
                goto err_out_disk;
+
        set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
        set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
 
+       rbd_dev->rq_wq = alloc_workqueue(rbd_dev->disk->disk_name, 0, 0);
+       if (!rbd_dev->rq_wq)
+               goto err_out_mapping;
+
        ret = rbd_bus_add_dev(rbd_dev);
        if (ret)
-               goto err_out_mapping;
+               goto err_out_workqueue;
 
        /* Everything's ready.  Announce the disk to the world. */
 
@@ -5068,6 +5106,9 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 
        return ret;
 
+err_out_workqueue:
+       destroy_workqueue(rbd_dev->rq_wq);
+       rbd_dev->rq_wq = NULL;
 err_out_mapping:
        rbd_dev_mapping_clear(rbd_dev);
 err_out_disk:
@@ -5314,6 +5355,7 @@ static void rbd_dev_device_release(struct device *dev)
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
+       destroy_workqueue(rbd_dev->rq_wq);
        rbd_free_disk(rbd_dev);
        clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
        rbd_dev_mapping_clear(rbd_dev);