bcache: Refactor journalling flow control
authorKent Overstreet <kmo@daterainc.com>
Fri, 25 Oct 2013 00:07:04 +0000 (17:07 -0700)
committerKent Overstreet <kmo@daterainc.com>
Mon, 11 Nov 2013 05:56:02 +0000 (21:56 -0800)
Making things less asynchronous that don't need to be - bch_journal()
only has to block when the journal or journal entry is full, which is
emphatically not a fast path. So make it a normal function that just
returns when it finishes, to make the code and control flow easier to
follow.

Signed-off-by: Kent Overstreet <kmo@daterainc.com>
drivers/md/bcache/btree.c
drivers/md/bcache/closure.h
drivers/md/bcache/journal.c
drivers/md/bcache/journal.h
drivers/md/bcache/movinggc.c
drivers/md/bcache/request.c
drivers/md/bcache/request.h

index f960607f1f25b0eb212b3da4af7f9d699a2707ec..777c01d67ef03dae9094d8f2df258c148e58a1ef 100644 (file)
@@ -2164,9 +2164,6 @@ int bch_btree_insert(struct btree_op *op, struct cache_set *c,
                }
        }
 
-       if (op->journal)
-               atomic_dec_bug(op->journal);
-       op->journal = NULL;
        return ret;
 }
 
index 00039924ea9dde4ff64f7510a44202d1a1330be0..ab011f03801f63d89d68cf18b6681612bfcdfeee 100644 (file)
@@ -642,7 +642,7 @@ do {                                                                        \
 #define continue_at_nobarrier(_cl, _fn, _wq)                           \
 do {                                                                   \
        set_closure_fn(_cl, _fn, _wq);                                  \
-       closure_queue(cl);                                              \
+       closure_queue(_cl);                                             \
        return;                                                         \
 } while (0)
 
index 1bdefdb1fa7160277f84591bf150dacc9100e86b..940e89e0d706ad813fdf3fc7f71d38810c6f3694 100644 (file)
@@ -318,7 +318,6 @@ int bch_journal_replay(struct cache_set *s, struct list_head *list,
                        bch_keylist_push(&op->keys);
 
                        op->journal = i->pin;
-                       atomic_inc(op->journal);
 
                        ret = bch_btree_insert(op, s, &op->keys);
                        if (ret)
@@ -357,48 +356,35 @@ static void btree_flush_write(struct cache_set *c)
         * Try to find the btree node with that references the oldest journal
         * entry, best is our current candidate and is locked if non NULL:
         */
-       struct btree *b, *best = NULL;
-       unsigned iter;
+       struct btree *b, *best;
+       unsigned i;
+retry:
+       best = NULL;
+
+       for_each_cached_btree(b, c, i)
+               if (btree_current_write(b)->journal) {
+                       if (!best)
+                               best = b;
+                       else if (journal_pin_cmp(c,
+                                                btree_current_write(best),
+                                                btree_current_write(b))) {
+                               best = b;
+                       }
+               }
 
-       for_each_cached_btree(b, c, iter) {
-               if (!down_write_trylock(&b->lock))
-                       continue;
+       b = best;
+       if (b) {
+               rw_lock(true, b, b->level);
 
-               if (!btree_node_dirty(b) ||
-                   !btree_current_write(b)->journal) {
+               if (!btree_current_write(b)->journal) {
                        rw_unlock(true, b);
-                       continue;
+                       /* We raced */
+                       goto retry;
                }
 
-               if (!best)
-                       best = b;
-               else if (journal_pin_cmp(c,
-                                        btree_current_write(best),
-                                        btree_current_write(b))) {
-                       rw_unlock(true, best);
-                       best = b;
-               } else
-                       rw_unlock(true, b);
+               bch_btree_node_write(b, NULL);
+               rw_unlock(true, b);
        }
-
-       if (best)
-               goto out;
-
-       /* We can't find the best btree node, just pick the first */
-       list_for_each_entry(b, &c->btree_cache, list)
-               if (!b->level && btree_node_dirty(b)) {
-                       best = b;
-                       rw_lock(true, best, best->level);
-                       goto found;
-               }
-
-out:
-       if (!best)
-               return;
-found:
-       if (btree_node_dirty(best))
-               bch_btree_node_write(best, NULL);
-       rw_unlock(true, best);
 }
 
 #define last_seq(j)    ((j)->seq - fifo_used(&(j)->pin) + 1)
@@ -494,7 +480,7 @@ static void journal_reclaim(struct cache_set *c)
                do_journal_discard(ca);
 
        if (c->journal.blocks_free)
-               return;
+               goto out;
 
        /*
         * Allocate:
@@ -520,7 +506,7 @@ static void journal_reclaim(struct cache_set *c)
 
        if (n)
                c->journal.blocks_free = c->sb.bucket_size >> c->block_bits;
-
+out:
        if (!journal_full(&c->journal))
                __closure_wake_up(&c->journal.wait);
 }
@@ -659,7 +645,7 @@ static void journal_write(struct closure *cl)
        journal_write_unlocked(cl);
 }
 
-static void __journal_try_write(struct cache_set *c, bool noflush)
+static void journal_try_write(struct cache_set *c)
        __releases(c->journal.lock)
 {
        struct closure *cl = &c->journal.io;
@@ -667,29 +653,59 @@ static void __journal_try_write(struct cache_set *c, bool noflush)
 
        w->need_write = true;
 
-       if (!closure_trylock(cl, &c->cl))
-               spin_unlock(&c->journal.lock);
-       else if (noflush && journal_full(&c->journal)) {
-               spin_unlock(&c->journal.lock);
-               continue_at(cl, journal_write, system_wq);
-       } else
+       if (closure_trylock(cl, &c->cl))
                journal_write_unlocked(cl);
+       else
+               spin_unlock(&c->journal.lock);
 }
 
-#define journal_try_write(c)   __journal_try_write(c, false)
-
-void bch_journal_meta(struct cache_set *c, struct closure *cl)
+static struct journal_write *journal_wait_for_write(struct cache_set *c,
+                                                   unsigned nkeys)
 {
-       struct journal_write *w;
+       size_t sectors;
+       struct closure cl;
 
-       if (CACHE_SYNC(&c->sb)) {
-               spin_lock(&c->journal.lock);
-               w = c->journal.cur;
+       closure_init_stack(&cl);
+
+       spin_lock(&c->journal.lock);
+
+       while (1) {
+               struct journal_write *w = c->journal.cur;
+
+               sectors = __set_blocks(w->data, w->data->keys + nkeys,
+                                      c) * c->sb.block_size;
+
+               if (sectors <= min_t(size_t,
+                                    c->journal.blocks_free * c->sb.block_size,
+                                    PAGE_SECTORS << JSET_BITS))
+                       return w;
+
+               /* XXX: tracepoint */
+               if (!journal_full(&c->journal)) {
+                       trace_bcache_journal_entry_full(c);
+
+                       /*
+                        * XXX: If we were inserting so many keys that they
+                        * won't fit in an _empty_ journal write, we'll
+                        * deadlock. For now, handle this in
+                        * bch_keylist_realloc() - but something to think about.
+                        */
+                       BUG_ON(!w->data->keys);
+
+                       closure_wait(&w->wait, &cl);
+                       journal_try_write(c); /* unlocks */
+               } else {
+                       trace_bcache_journal_full(c);
+
+                       closure_wait(&c->journal.wait, &cl);
+                       journal_reclaim(c);
+                       spin_unlock(&c->journal.lock);
 
-               if (cl)
-                       BUG_ON(!closure_wait(&w->wait, cl));
+                       btree_flush_write(c);
+               }
 
-               __journal_try_write(c, true);
+               closure_sync(&cl);
+               spin_lock(&c->journal.lock);
        }
 }
 
@@ -708,68 +724,26 @@ static void journal_write_work(struct work_struct *work)
  * bch_journal() hands those same keys off to btree_insert_async()
  */
 
-void bch_journal(struct closure *cl)
+atomic_t *bch_journal(struct cache_set *c,
+                     struct keylist *keys,
+                     struct closure *parent)
 {
-       struct btree_op *op = container_of(cl, struct btree_op, cl);
-       struct cache_set *c = op->c;
        struct journal_write *w;
-       size_t sectors, nkeys;
-
-       if (op->type != BTREE_INSERT ||
-           !CACHE_SYNC(&c->sb))
-               goto out;
-
-       /*
-        * If we're looping because we errored, might already be waiting on
-        * another journal write:
-        */
-       while (atomic_read(&cl->parent->remaining) & CLOSURE_WAITING)
-               closure_sync(cl->parent);
-
-       spin_lock(&c->journal.lock);
-
-       if (journal_full(&c->journal)) {
-               trace_bcache_journal_full(c);
-
-               closure_wait(&c->journal.wait, cl);
-
-               journal_reclaim(c);
-               spin_unlock(&c->journal.lock);
-
-               btree_flush_write(c);
-               continue_at(cl, bch_journal, bcache_wq);
-       }
+       atomic_t *ret;
 
-       w = c->journal.cur;
-       nkeys = w->data->keys + bch_keylist_nkeys(&op->keys);
-       sectors = __set_blocks(w->data, nkeys, c) * c->sb.block_size;
+       if (!CACHE_SYNC(&c->sb))
+               return NULL;
 
-       if (sectors > min_t(size_t,
-                           c->journal.blocks_free * c->sb.block_size,
-                           PAGE_SECTORS << JSET_BITS)) {
-               trace_bcache_journal_entry_full(c);
+       w = journal_wait_for_write(c, bch_keylist_nkeys(keys));
 
-               /*
-                * XXX: If we were inserting so many keys that they won't fit in
-                * an _empty_ journal write, we'll deadlock. For now, handle
-                * this in bch_keylist_realloc() - but something to think about.
-                */
-               BUG_ON(!w->data->keys);
+       memcpy(end(w->data), keys->keys, bch_keylist_bytes(keys));
+       w->data->keys += bch_keylist_nkeys(keys);
 
-               BUG_ON(!closure_wait(&w->wait, cl));
+       ret = &fifo_back(&c->journal.pin);
+       atomic_inc(ret);
 
-               journal_try_write(c);
-               continue_at(cl, bch_journal, bcache_wq);
-       }
-
-       memcpy(end(w->data), op->keys.keys, bch_keylist_bytes(&op->keys));
-       w->data->keys += bch_keylist_nkeys(&op->keys);
-
-       op->journal = &fifo_back(&c->journal.pin);
-       atomic_inc(op->journal);
-
-       if (op->flush_journal) {
-               closure_wait(&w->wait, cl->parent);
+       if (parent) {
+               closure_wait(&w->wait, parent);
                journal_try_write(c);
        } else if (!w->need_write) {
                schedule_delayed_work(&c->journal.work,
@@ -778,8 +752,21 @@ void bch_journal(struct closure *cl)
        } else {
                spin_unlock(&c->journal.lock);
        }
-out:
-       bch_btree_insert_async(cl);
+
+
+       return ret;
+}
+
+void bch_journal_meta(struct cache_set *c, struct closure *cl)
+{
+       struct keylist keys;
+       atomic_t *ref;
+
+       bch_keylist_init(&keys);
+
+       ref = bch_journal(c, &keys, cl);
+       if (ref)
+               atomic_dec_bug(ref);
 }
 
 void bch_journal_free(struct cache_set *c)
index 3ca93d3d566cea572e9fd8caf462a807033cc013..7045e6fd2d5a217a48574395c4dcf4c77298da3b 100644 (file)
@@ -200,8 +200,9 @@ struct journal_device {
 struct closure;
 struct cache_set;
 struct btree_op;
+struct keylist;
 
-void bch_journal(struct closure *);
+atomic_t *bch_journal(struct cache_set *, struct keylist *, struct closure *);
 void bch_journal_next(struct journal *);
 void bch_journal_mark(struct cache_set *, struct list_head *);
 void bch_journal_meta(struct cache_set *, struct closure *);
index dd8a035c5ae1c9f95b9cf41c07a70489dd5f785a..2c42377a65aad993388851ebc20c430ae1fdf47c 100644 (file)
@@ -110,7 +110,7 @@ static void write_moving(struct closure *cl)
                bkey_copy(&s->op.replace, &io->w->key);
 
                closure_init(&s->op.cl, cl);
-               bch_insert_data(&s->op.cl);
+               bch_data_insert(&s->op.cl);
        }
 
        continue_at(cl, write_moving_finish, bch_gc_wq);
index 3b85f33ae4c76603fff2014f7b9e94790719a20c..1c3af44b097b710c77dc71401f8ab422c8fe9966 100644 (file)
@@ -25,6 +25,8 @@
 
 struct kmem_cache *bch_search_cache;
 
+static void bch_data_insert_start(struct closure *);
+
 /* Cgroup interface */
 
 #ifdef CONFIG_CGROUP_BCACHE
@@ -211,31 +213,42 @@ static void bio_csum(struct bio *bio, struct bkey *k)
 
 /* Insert data into cache */
 
-static void bio_invalidate(struct closure *cl)
+static void bch_data_insert_keys(struct closure *cl)
 {
        struct btree_op *op = container_of(cl, struct btree_op, cl);
-       struct bio *bio = op->cache_bio;
+       struct search *s = container_of(op, struct search, op);
 
-       pr_debug("invalidating %i sectors from %llu",
-                bio_sectors(bio), (uint64_t) bio->bi_sector);
+       /*
+        * If we're looping, might already be waiting on
+        * another journal write - can't wait on more than one journal write at
+        * a time
+        *
+        * XXX: this looks wrong
+        */
+#if 0
+       while (atomic_read(&s->cl.remaining) & CLOSURE_WAITING)
+               closure_sync(&s->cl);
+#endif
 
-       while (bio_sectors(bio)) {
-               unsigned len = min(bio_sectors(bio), 1U << 14);
+       if (s->write)
+               op->journal = bch_journal(op->c, &op->keys,
+                                         op->flush_journal
+                                         ? &s->cl : NULL);
 
-               if (bch_keylist_realloc(&op->keys, 0, op->c))
-                       goto out;
+       if (bch_btree_insert(op, op->c, &op->keys)) {
+               s->error                = -ENOMEM;
+               op->insert_data_done    = true;
+       }
 
-               bio->bi_sector  += len;
-               bio->bi_size    -= len << 9;
+       if (op->journal)
+               atomic_dec_bug(op->journal);
+       op->journal = NULL;
 
-               bch_keylist_add(&op->keys,
-                               &KEY(op->inode, bio->bi_sector, len));
-       }
+       if (!op->insert_data_done)
+               continue_at(cl, bch_data_insert_start, bcache_wq);
 
-       op->insert_data_done = true;
-       bio_put(bio);
-out:
-       continue_at(cl, bch_journal, bcache_wq);
+       bch_keylist_free(&op->keys);
+       closure_return(cl);
 }
 
 struct open_bucket {
@@ -423,7 +436,34 @@ static bool bch_alloc_sectors(struct bkey *k, unsigned sectors,
        return true;
 }
 
-static void bch_insert_data_error(struct closure *cl)
+static void bch_data_invalidate(struct closure *cl)
+{
+       struct btree_op *op = container_of(cl, struct btree_op, cl);
+       struct bio *bio = op->cache_bio;
+
+       pr_debug("invalidating %i sectors from %llu",
+                bio_sectors(bio), (uint64_t) bio->bi_sector);
+
+       while (bio_sectors(bio)) {
+               unsigned len = min(bio_sectors(bio), 1U << 14);
+
+               if (bch_keylist_realloc(&op->keys, 0, op->c))
+                       goto out;
+
+               bio->bi_sector  += len;
+               bio->bi_size    -= len << 9;
+
+               bch_keylist_add(&op->keys, &KEY(op->inode,
+                                               bio->bi_sector, len));
+       }
+
+       op->insert_data_done = true;
+       bio_put(bio);
+out:
+       continue_at(cl, bch_data_insert_keys, bcache_wq);
+}
+
+static void bch_data_insert_error(struct closure *cl)
 {
        struct btree_op *op = container_of(cl, struct btree_op, cl);
 
@@ -450,10 +490,10 @@ static void bch_insert_data_error(struct closure *cl)
 
        op->keys.top = dst;
 
-       bch_journal(cl);
+       bch_data_insert_keys(cl);
 }
 
-static void bch_insert_data_endio(struct bio *bio, int error)
+static void bch_data_insert_endio(struct bio *bio, int error)
 {
        struct closure *cl = bio->bi_private;
        struct btree_op *op = container_of(cl, struct btree_op, cl);
@@ -464,7 +504,7 @@ static void bch_insert_data_endio(struct bio *bio, int error)
                if (s->writeback)
                        s->error = error;
                else if (s->write)
-                       set_closure_fn(cl, bch_insert_data_error, bcache_wq);
+                       set_closure_fn(cl, bch_data_insert_error, bcache_wq);
                else
                        set_closure_fn(cl, NULL, NULL);
        }
@@ -472,14 +512,14 @@ static void bch_insert_data_endio(struct bio *bio, int error)
        bch_bbio_endio(op->c, bio, error, "writing data to cache");
 }
 
-static void bch_insert_data_loop(struct closure *cl)
+static void bch_data_insert_start(struct closure *cl)
 {
        struct btree_op *op = container_of(cl, struct btree_op, cl);
        struct search *s = container_of(op, struct search, op);
        struct bio *bio = op->cache_bio, *n;
 
        if (op->bypass)
-               return bio_invalidate(cl);
+               return bch_data_invalidate(cl);
 
        if (atomic_sub_return(bio_sectors(bio), &op->c->sectors_to_gc) < 0) {
                set_gc_sectors(op->c);
@@ -502,7 +542,7 @@ static void bch_insert_data_loop(struct closure *cl)
                if (bch_keylist_realloc(&op->keys,
                                        1 + (op->csum ? 1 : 0),
                                        op->c))
-                       continue_at(cl, bch_journal, bcache_wq);
+                       continue_at(cl, bch_data_insert_keys, bcache_wq);
 
                k = op->keys.top;
                bkey_init(k);
@@ -514,7 +554,7 @@ static void bch_insert_data_loop(struct closure *cl)
 
                n = bch_bio_split(bio, KEY_SIZE(k), GFP_NOIO, split);
 
-               n->bi_end_io    = bch_insert_data_endio;
+               n->bi_end_io    = bch_data_insert_endio;
                n->bi_private   = cl;
 
                if (s->writeback) {
@@ -537,7 +577,7 @@ static void bch_insert_data_loop(struct closure *cl)
        } while (n != bio);
 
        op->insert_data_done = true;
-       continue_at(cl, bch_journal, bcache_wq);
+       continue_at(cl, bch_data_insert_keys, bcache_wq);
 err:
        /* bch_alloc_sectors() blocks if s->writeback = true */
        BUG_ON(s->writeback);
@@ -556,7 +596,7 @@ err:
                 * rest of the write.
                 */
                op->bypass = true;
-               return bio_invalidate(cl);
+               return bch_data_invalidate(cl);
        } else {
                /*
                 * From a cache miss, we can just insert the keys for the data
@@ -566,14 +606,14 @@ err:
                bio_put(bio);
 
                if (!bch_keylist_empty(&op->keys))
-                       continue_at(cl, bch_journal, bcache_wq);
+                       continue_at(cl, bch_data_insert_keys, bcache_wq);
                else
                        closure_return(cl);
        }
 }
 
 /**
- * bch_insert_data - stick some data in the cache
+ * bch_data_insert - stick some data in the cache
  *
  * This is the starting point for any data to end up in a cache device; it could
  * be from a normal write, or a writeback write, or a write to a flash only
@@ -591,30 +631,13 @@ err:
  * If op->bypass is true, instead of inserting the data it invalidates the
  * region of the cache represented by op->cache_bio and op->inode.
  */
-void bch_insert_data(struct closure *cl)
+void bch_data_insert(struct closure *cl)
 {
        struct btree_op *op = container_of(cl, struct btree_op, cl);
 
        bch_keylist_init(&op->keys);
        bio_get(op->cache_bio);
-       bch_insert_data_loop(cl);
-}
-
-void bch_btree_insert_async(struct closure *cl)
-{
-       struct btree_op *op = container_of(cl, struct btree_op, cl);
-       struct search *s = container_of(op, struct search, op);
-
-       if (bch_btree_insert(op, op->c, &op->keys)) {
-               s->error                = -ENOMEM;
-               op->insert_data_done    = true;
-       }
-
-       if (op->insert_data_done) {
-               bch_keylist_free(&op->keys);
-               closure_return(cl);
-       } else
-               continue_at(cl, bch_insert_data_loop, bcache_wq);
+       bch_data_insert_start(cl);
 }
 
 /* Common code for the make_request functions */
@@ -969,7 +992,7 @@ static void cached_dev_read_done(struct closure *cl)
        if (s->op.cache_bio &&
            !test_bit(CACHE_SET_STOPPING, &s->op.c->flags)) {
                s->op.type = BTREE_REPLACE;
-               closure_call(&s->op.cl, bch_insert_data, NULL, cl);
+               closure_call(&s->op.cl, bch_data_insert, NULL, cl);
        }
 
        continue_at(cl, cached_dev_cache_miss_done, NULL);
@@ -1147,13 +1170,13 @@ static void cached_dev_write(struct cached_dev *dc, struct search *s)
                closure_bio_submit(bio, cl, s->d);
        }
 
-       closure_call(&s->op.cl, bch_insert_data, NULL, cl);
+       closure_call(&s->op.cl, bch_data_insert, NULL, cl);
        continue_at(cl, cached_dev_write_complete, NULL);
 }
 
-static void cached_dev_nodata(struct cached_dev *dc, struct search *s)
+static void cached_dev_nodata(struct closure *cl)
 {
-       struct closure *cl = &s->cl;
+       struct search *s = container_of(cl, struct search, cl);
        struct bio *bio = &s->bio.bio;
 
        if (s->op.flush_journal)
@@ -1186,9 +1209,15 @@ static void cached_dev_make_request(struct request_queue *q, struct bio *bio)
                s = search_alloc(bio, d);
                trace_bcache_request_start(s, bio);
 
-               if (!bio->bi_size)
-                       cached_dev_nodata(dc, s);
-               else {
+               if (!bio->bi_size) {
+                       /*
+                        * can't call bch_journal_meta from under
+                        * generic_make_request
+                        */
+                       continue_at_nobarrier(&s->cl,
+                                             cached_dev_nodata,
+                                             bcache_wq);
+               } else {
                        s->op.bypass = check_should_bypass(dc, s);
 
                        if (rw)
@@ -1275,6 +1304,16 @@ static int flash_dev_cache_miss(struct btree *b, struct search *s,
        return 0;
 }
 
+static void flash_dev_nodata(struct closure *cl)
+{
+       struct search *s = container_of(cl, struct search, cl);
+
+       if (s->op.flush_journal)
+               bch_journal_meta(s->op.c, cl);
+
+       continue_at(cl, search_free, NULL);
+}
+
 static void flash_dev_make_request(struct request_queue *q, struct bio *bio)
 {
        struct search *s;
@@ -1294,8 +1333,13 @@ static void flash_dev_make_request(struct request_queue *q, struct bio *bio)
        trace_bcache_request_start(s, bio);
 
        if (!bio->bi_size) {
-               if (s->op.flush_journal)
-                       bch_journal_meta(s->op.c, cl);
+               /*
+                * can't call bch_journal_meta from under
+                * generic_make_request
+                */
+               continue_at_nobarrier(&s->cl,
+                                     flash_dev_nodata,
+                                     bcache_wq);
        } else if (rw) {
                bch_keybuf_check_overlapping(&s->op.c->moving_gc_keys,
                                        &KEY(d->id, bio->bi_sector, 0),
@@ -1305,7 +1349,7 @@ static void flash_dev_make_request(struct request_queue *q, struct bio *bio)
                s->writeback    = true;
                s->op.cache_bio = bio;
 
-               closure_call(&s->op.cl, bch_insert_data, NULL, cl);
+               closure_call(&s->op.cl, bch_data_insert, NULL, cl);
        } else {
                closure_call(&s->op.cl, btree_read_async, NULL, cl);
        }
index 57dc4784f4f48c06b6816232d352d4992e45cee1..1f1b59d38db5b8d428200238c14886be89ceaf25 100644 (file)
@@ -31,8 +31,7 @@ struct search {
 
 void bch_cache_read_endio(struct bio *, int);
 unsigned bch_get_congested(struct cache_set *);
-void bch_insert_data(struct closure *cl);
-void bch_btree_insert_async(struct closure *);
+void bch_data_insert(struct closure *cl);
 void bch_cache_read_endio(struct bio *, int);
 
 void bch_open_buckets_free(struct cache_set *);