From: Ilya Dryomov Date: Mon, 4 Aug 2014 14:04:39 +0000 (+0400) Subject: rbd: rework rbd_request_fn() X-Git-Tag: firefly_0821_release~176^2~3410^2~3 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=bc1ecc65a259fa9333dc8bd6a4ba0cf03b7d4bf8;p=firefly-linux-kernel-4.4.55.git rbd: rework rbd_request_fn() While it was never a good idea to sleep in request_fn(), commit 34c6bc2c919a ("locking/mutexes: Add extra reschedule point") made it a *bad* idea. mutex_lock() since 3.15 may reschedule *before* putting task on the mutex wait queue, which for tasks in !TASK_RUNNING state means block forever. request_fn() may be called with !TASK_RUNNING on the way to schedule() in io_schedule(). Offload request handling to a workqueue, one per rbd device, to avoid calling blocking primitives from rbd_request_fn(). Fixes: http://tracker.ceph.com/issues/8818 Cc: stable@vger.kernel.org # 3.16, needs backporting for 3.15 Signed-off-by: Ilya Dryomov Tested-by: Eric Eastman Tested-by: Greg Wilson Reviewed-by: Alex Elder --- diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index cbc89fa9a677..4515b128d0b4 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -42,6 +42,7 @@ #include #include #include +#include #include "rbd_types.h" @@ -332,7 +333,10 @@ struct rbd_device { char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ + struct list_head rq_queue; /* incoming rq queue */ spinlock_t lock; /* queue, flags, open_count */ + struct workqueue_struct *rq_wq; + struct work_struct rq_work; struct rbd_image_header header; unsigned long flags; /* possibly lock protected */ @@ -3176,102 +3180,129 @@ out: return ret; } -static void rbd_request_fn(struct request_queue *q) - __releases(q->queue_lock) __acquires(q->queue_lock) +static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq) { - struct rbd_device *rbd_dev = q->queuedata; - struct request *rq; + struct rbd_img_request *img_request; + u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT; + u64 length = blk_rq_bytes(rq); + bool wr = rq_data_dir(rq) == WRITE; int result; - while ((rq = blk_fetch_request(q))) { - bool write_request = rq_data_dir(rq) == WRITE; - struct rbd_img_request *img_request; - u64 offset; - u64 length; + /* Ignore/skip any zero-length requests */ - /* Ignore any non-FS requests that filter through. */ + if (!length) { + dout("%s: zero-length request\n", __func__); + result = 0; + goto err_rq; + } - if (rq->cmd_type != REQ_TYPE_FS) { - dout("%s: non-fs request type %d\n", __func__, - (int) rq->cmd_type); - __blk_end_request_all(rq, 0); - continue; + /* Disallow writes to a read-only device */ + + if (wr) { + if (rbd_dev->mapping.read_only) { + result = -EROFS; + goto err_rq; } + rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); + } - /* Ignore/skip any zero-length requests */ + /* + * Quit early if the mapped snapshot no longer exists. It's + * still possible the snapshot will have disappeared by the + * time our request arrives at the osd, but there's no sense in + * sending it if we already know. + */ + if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { + dout("request for non-existent snapshot"); + rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); + result = -ENXIO; + goto err_rq; + } - offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT; - length = (u64) blk_rq_bytes(rq); + if (offset && length > U64_MAX - offset + 1) { + rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset, + length); + result = -EINVAL; + goto err_rq; /* Shouldn't happen */ + } - if (!length) { - dout("%s: zero-length request\n", __func__); - __blk_end_request_all(rq, 0); - continue; - } + if (offset + length > rbd_dev->mapping.size) { + rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset, + length, rbd_dev->mapping.size); + result = -EIO; + goto err_rq; + } - spin_unlock_irq(q->queue_lock); + img_request = rbd_img_request_create(rbd_dev, offset, length, wr); + if (!img_request) { + result = -ENOMEM; + goto err_rq; + } + img_request->rq = rq; - /* Disallow writes to a read-only device */ + result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, rq->bio); + if (result) + goto err_img_request; - if (write_request) { - result = -EROFS; - if (rbd_dev->mapping.read_only) - goto end_request; - rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); - } + result = rbd_img_request_submit(img_request); + if (result) + goto err_img_request; - /* - * Quit early if the mapped snapshot no longer - * exists. It's still possible the snapshot will - * have disappeared by the time our request arrives - * at the osd, but there's no sense in sending it if - * we already know. - */ - if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { - dout("request for non-existent snapshot"); - rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); - result = -ENXIO; - goto end_request; - } + return; - result = -EINVAL; - if (offset && length > U64_MAX - offset + 1) { - rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n", - offset, length); - goto end_request; /* Shouldn't happen */ - } +err_img_request: + rbd_img_request_put(img_request); +err_rq: + if (result) + rbd_warn(rbd_dev, "%s %llx at %llx result %d", + wr ? "write" : "read", length, offset, result); + blk_end_request_all(rq, result); +} - result = -EIO; - if (offset + length > rbd_dev->mapping.size) { - rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n", - offset, length, rbd_dev->mapping.size); - goto end_request; - } +static void rbd_request_workfn(struct work_struct *work) +{ + struct rbd_device *rbd_dev = + container_of(work, struct rbd_device, rq_work); + struct request *rq, *next; + LIST_HEAD(requests); - result = -ENOMEM; - img_request = rbd_img_request_create(rbd_dev, offset, length, - write_request); - if (!img_request) - goto end_request; + spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */ + list_splice_init(&rbd_dev->rq_queue, &requests); + spin_unlock_irq(&rbd_dev->lock); - img_request->rq = rq; + list_for_each_entry_safe(rq, next, &requests, queuelist) { + list_del_init(&rq->queuelist); + rbd_handle_request(rbd_dev, rq); + } +} - result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, - rq->bio); - if (!result) - result = rbd_img_request_submit(img_request); - if (result) - rbd_img_request_put(img_request); -end_request: - spin_lock_irq(q->queue_lock); - if (result < 0) { - rbd_warn(rbd_dev, "%s %llx at %llx result %d\n", - write_request ? "write" : "read", - length, offset, result); - - __blk_end_request_all(rq, result); +/* + * Called with q->queue_lock held and interrupts disabled, possibly on + * the way to schedule(). Do not sleep here! + */ +static void rbd_request_fn(struct request_queue *q) +{ + struct rbd_device *rbd_dev = q->queuedata; + struct request *rq; + int queued = 0; + + rbd_assert(rbd_dev); + + while ((rq = blk_fetch_request(q))) { + /* Ignore any non-FS requests that filter through. */ + if (rq->cmd_type != REQ_TYPE_FS) { + dout("%s: non-fs request type %d\n", __func__, + (int) rq->cmd_type); + __blk_end_request_all(rq, 0); + continue; } + + list_add_tail(&rq->queuelist, &rbd_dev->rq_queue); + queued++; } + + if (queued) + queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work); } /* @@ -3847,6 +3878,8 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, return NULL; spin_lock_init(&rbd_dev->lock); + INIT_LIST_HEAD(&rbd_dev->rq_queue); + INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn); rbd_dev->flags = 0; atomic_set(&rbd_dev->parent_ref, 0); INIT_LIST_HEAD(&rbd_dev->node); @@ -5051,12 +5084,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) ret = rbd_dev_mapping_set(rbd_dev); if (ret) goto err_out_disk; + set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only); + rbd_dev->rq_wq = alloc_workqueue(rbd_dev->disk->disk_name, 0, 0); + if (!rbd_dev->rq_wq) + goto err_out_mapping; + ret = rbd_bus_add_dev(rbd_dev); if (ret) - goto err_out_mapping; + goto err_out_workqueue; /* Everything's ready. Announce the disk to the world. */ @@ -5068,6 +5106,9 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) return ret; +err_out_workqueue: + destroy_workqueue(rbd_dev->rq_wq); + rbd_dev->rq_wq = NULL; err_out_mapping: rbd_dev_mapping_clear(rbd_dev); err_out_disk: @@ -5314,6 +5355,7 @@ static void rbd_dev_device_release(struct device *dev) { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + destroy_workqueue(rbd_dev->rq_wq); rbd_free_disk(rbd_dev); clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); rbd_dev_mapping_clear(rbd_dev);