From 6d544bb4d9019c3a0d7ee4af1e4bbbd61a6e16dc Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Sun, 10 Dec 2006 02:20:54 -0800 Subject: [PATCH] [PATCH] dio: centralize completion in dio_complete() There have been a lot of bugs recently due to the way direct_io_worker() tries to decide how to finish direct IO operations. In the worst examples it has failed to call aio_complete() at all (hang) or called it too many times (oops). This set of patches cleans up the completion phase with the goal of removing the complexity that lead to these bugs. We end up with one path that calculates the result of the operation after all off the bios have completed. We decide when to generate a result of the operation using that path based on the final release of a refcount on the dio structure. I tried to progress towards the final state in steps that were relatively easy to understand. Each step should compile but I only tested the final result of having all the patches applied. I've tested these on low end PC drives with aio-stress, the direct IO tests I could manage to get running in LTP, orasim, and some home-brew functional tests. In http://lkml.org/lkml/2006/9/21/103 IBM reports success with ext2 and ext3 running DIO LTP tests. They found that XFS bug which has since been addressed in the patch series. This patch: The mechanics which decide the result of a direct IO operation were duplicated in the sync and async paths. The async path didn't check page_errors which can manifest as silently returning success when the final pointer in an operation faults and its matching file region is filled with zeros. The sync path and async path differed in whether they passed errors to the caller's dio->end_io operation. The async path was passing errors to it which trips an assertion in XFS, though it is apparently harmless. This centralizes the completion phase of dio ops in one place. AIO will now return EFAULT consistently and all paths fall back to the previously sync behaviour of passing the number of bytes 'transferred' to the dio->end_io callback, regardless of errors. dio_await_completion() doesn't have to propogate EIO from non-uptodate bios now that it's being propogated through dio_complete() via dio->io_error. This lets it return void which simplifies its sole caller. Signed-off-by: Zach Brown Cc: Badari Pulavarty Cc: Suparna Bhattacharya Acked-by: Jeff Moyer Signed-off-by: Andrew Morton Signed-off-by: Linus Torvalds --- fs/direct-io.c | 94 ++++++++++++++++++++++---------------------------- 1 file changed, 42 insertions(+), 52 deletions(-) diff --git a/fs/direct-io.c b/fs/direct-io.c index 45d34d807391..b57b671e1106 100644 --- a/fs/direct-io.c +++ b/fs/direct-io.c @@ -210,19 +210,46 @@ static struct page *dio_get_page(struct dio *dio) return dio->pages[dio->head++]; } -/* - * Called when all DIO BIO I/O has been completed - let the filesystem - * know, if it registered an interest earlier via get_block. Pass the - * private field of the map buffer_head so that filesystems can use it - * to hold additional state between get_block calls and dio_complete. +/** + * dio_complete() - called when all DIO BIO I/O has been completed + * @offset: the byte offset in the file of the completed operation + * + * This releases locks as dictated by the locking type, lets interested parties + * know that a DIO operation has completed, and calculates the resulting return + * code for the operation. + * + * It lets the filesystem know if it registered an interest earlier via + * get_block. Pass the private field of the map buffer_head so that + * filesystems can use it to hold additional state between get_block calls and + * dio_complete. */ -static void dio_complete(struct dio *dio, loff_t offset, ssize_t bytes) +static int dio_complete(struct dio *dio, loff_t offset, int ret) { + ssize_t transferred = 0; + + if (dio->result) { + transferred = dio->result; + + /* Check for short read case */ + if ((dio->rw == READ) && ((offset + transferred) > dio->i_size)) + transferred = dio->i_size - offset; + } + if (dio->end_io && dio->result) - dio->end_io(dio->iocb, offset, bytes, dio->map_bh.b_private); + dio->end_io(dio->iocb, offset, transferred, + dio->map_bh.b_private); if (dio->lock_type == DIO_LOCKING) /* lockdep: non-owner release */ up_read_non_owner(&dio->inode->i_alloc_sem); + + if (ret == 0) + ret = dio->page_errors; + if (ret == 0) + ret = dio->io_error; + if (ret == 0) + ret = transferred; + + return ret; } /* @@ -236,8 +263,7 @@ static void finished_one_bio(struct dio *dio) spin_lock_irqsave(&dio->bio_lock, flags); if (dio->bio_count == 1) { if (dio->is_async) { - ssize_t transferred; - loff_t offset; + int ret; /* * Last reference to the dio is going away. @@ -245,24 +271,12 @@ static void finished_one_bio(struct dio *dio) */ spin_unlock_irqrestore(&dio->bio_lock, flags); - /* Check for short read case */ - transferred = dio->result; - offset = dio->iocb->ki_pos; - - if ((dio->rw == READ) && - ((offset + transferred) > dio->i_size)) - transferred = dio->i_size - offset; - - /* check for error in completion path */ - if (dio->io_error) - transferred = dio->io_error; - - dio_complete(dio, offset, transferred); + ret = dio_complete(dio, dio->iocb->ki_pos, 0); /* Complete AIO later if falling back to buffered i/o */ if (dio->result == dio->size || ((dio->rw == READ) && dio->result)) { - aio_complete(dio->iocb, transferred, 0); + aio_complete(dio->iocb, ret, 0); kfree(dio); return; } else { @@ -434,10 +448,8 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio) /* * Wait on and process all in-flight BIOs. */ -static int dio_await_completion(struct dio *dio) +static void dio_await_completion(struct dio *dio) { - int ret = 0; - if (dio->bio) dio_bio_submit(dio); @@ -448,13 +460,9 @@ static int dio_await_completion(struct dio *dio) */ while (dio->bio_count) { struct bio *bio = dio_await_one(dio); - int ret2; - - ret2 = dio_bio_complete(dio, bio); - if (ret == 0) - ret = ret2; + /* io errors are propogated through dio->io_error */ + dio_bio_complete(dio, bio); } - return ret; } /* @@ -1127,28 +1135,10 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode, kfree(dio); } } else { - ssize_t transferred = 0; - finished_one_bio(dio); - ret2 = dio_await_completion(dio); - if (ret == 0) - ret = ret2; - if (ret == 0) - ret = dio->page_errors; - if (dio->result) { - loff_t i_size = i_size_read(inode); + dio_await_completion(dio); - transferred = dio->result; - /* - * Adjust the return value if the read crossed a - * non-block-aligned EOF. - */ - if (rw == READ && (offset + transferred > i_size)) - transferred = i_size - offset; - } - dio_complete(dio, offset, transferred); - if (ret == 0) - ret = transferred; + ret = dio_complete(dio, offset, ret); /* We could have also come here on an AIO file extend */ if (!is_sync_kiocb(iocb) && (rw & WRITE) && -- 2.34.1