Merge remote-tracking branch 'lsk/v3.10/topic/arm64-perf' into linux-linaro-lsk
[firefly-linux-kernel-4.4.55.git] / drivers / md / bcache / btree.c
1 /*
2  * Copyright (C) 2010 Kent Overstreet <kent.overstreet@gmail.com>
3  *
4  * Uses a block device as cache for other block devices; optimized for SSDs.
5  * All allocation is done in buckets, which should match the erase block size
6  * of the device.
7  *
8  * Buckets containing cached data are kept on a heap sorted by priority;
9  * bucket priority is increased on cache hit, and periodically all the buckets
10  * on the heap have their priority scaled down. This currently is just used as
11  * an LRU but in the future should allow for more intelligent heuristics.
12  *
13  * Buckets have an 8 bit counter; freeing is accomplished by incrementing the
14  * counter. Garbage collection is used to remove stale pointers.
15  *
16  * Indexing is done via a btree; nodes are not necessarily fully sorted, rather
17  * as keys are inserted we only sort the pages that have not yet been written.
18  * When garbage collection is run, we resort the entire node.
19  *
20  * All configuration is done via sysfs; see Documentation/bcache.txt.
21  */
22
23 #include "bcache.h"
24 #include "btree.h"
25 #include "debug.h"
26 #include "request.h"
27
28 #include <linux/slab.h>
29 #include <linux/bitops.h>
30 #include <linux/hash.h>
31 #include <linux/prefetch.h>
32 #include <linux/random.h>
33 #include <linux/rcupdate.h>
34 #include <trace/events/bcache.h>
35
36 /*
37  * Todo:
38  * register_bcache: Return errors out to userspace correctly
39  *
40  * Writeback: don't undirty key until after a cache flush
41  *
42  * Create an iterator for key pointers
43  *
44  * On btree write error, mark bucket such that it won't be freed from the cache
45  *
46  * Journalling:
47  *   Check for bad keys in replay
48  *   Propagate barriers
49  *   Refcount journal entries in journal_replay
50  *
51  * Garbage collection:
52  *   Finish incremental gc
53  *   Gc should free old UUIDs, data for invalid UUIDs
54  *
55  * Provide a way to list backing device UUIDs we have data cached for, and
56  * probably how long it's been since we've seen them, and a way to invalidate
57  * dirty data for devices that will never be attached again
58  *
59  * Keep 1 min/5 min/15 min statistics of how busy a block device has been, so
60  * that based on that and how much dirty data we have we can keep writeback
61  * from being starved
62  *
63  * Add a tracepoint or somesuch to watch for writeback starvation
64  *
65  * When btree depth > 1 and splitting an interior node, we have to make sure
66  * alloc_bucket() cannot fail. This should be true but is not completely
67  * obvious.
68  *
69  * Make sure all allocations get charged to the root cgroup
70  *
71  * Plugging?
72  *
73  * If data write is less than hard sector size of ssd, round up offset in open
74  * bucket to the next whole sector
75  *
76  * Also lookup by cgroup in get_open_bucket()
77  *
78  * Superblock needs to be fleshed out for multiple cache devices
79  *
80  * Add a sysfs tunable for the number of writeback IOs in flight
81  *
82  * Add a sysfs tunable for the number of open data buckets
83  *
84  * IO tracking: Can we track when one process is doing io on behalf of another?
85  * IO tracking: Don't use just an average, weigh more recent stuff higher
86  *
87  * Test module load/unload
88  */
89
90 static const char * const op_types[] = {
91         "insert", "replace"
92 };
93
94 static const char *op_type(struct btree_op *op)
95 {
96         return op_types[op->type];
97 }
98
99 #define MAX_NEED_GC             64
100 #define MAX_SAVE_PRIO           72
101
102 #define PTR_DIRTY_BIT           (((uint64_t) 1 << 36))
103
104 #define PTR_HASH(c, k)                                                  \
105         (((k)->ptr[0] >> c->bucket_bits) | PTR_GEN(k, 0))
106
107 struct workqueue_struct *bch_gc_wq;
108 static struct workqueue_struct *btree_io_wq;
109
110 void bch_btree_op_init_stack(struct btree_op *op)
111 {
112         memset(op, 0, sizeof(struct btree_op));
113         closure_init_stack(&op->cl);
114         op->lock = -1;
115         bch_keylist_init(&op->keys);
116 }
117
118 /* Btree key manipulation */
119
120 static void bkey_put(struct cache_set *c, struct bkey *k, int level)
121 {
122         if ((level && KEY_OFFSET(k)) || !level)
123                 __bkey_put(c, k);
124 }
125
126 /* Btree IO */
127
128 static uint64_t btree_csum_set(struct btree *b, struct bset *i)
129 {
130         uint64_t crc = b->key.ptr[0];
131         void *data = (void *) i + 8, *end = end(i);
132
133         crc = bch_crc64_update(crc, data, end - data);
134         return crc ^ 0xffffffffffffffffULL;
135 }
136
137 static void btree_bio_endio(struct bio *bio, int error)
138 {
139         struct closure *cl = bio->bi_private;
140         struct btree *b = container_of(cl, struct btree, io.cl);
141
142         if (error)
143                 set_btree_node_io_error(b);
144
145         bch_bbio_count_io_errors(b->c, bio, error, (bio->bi_rw & WRITE)
146                                  ? "writing btree" : "reading btree");
147         closure_put(cl);
148 }
149
150 static void btree_bio_init(struct btree *b)
151 {
152         BUG_ON(b->bio);
153         b->bio = bch_bbio_alloc(b->c);
154
155         b->bio->bi_end_io       = btree_bio_endio;
156         b->bio->bi_private      = &b->io.cl;
157 }
158
159 void bch_btree_read_done(struct closure *cl)
160 {
161         struct btree *b = container_of(cl, struct btree, io.cl);
162         struct bset *i = b->sets[0].data;
163         struct btree_iter *iter = b->c->fill_iter;
164         const char *err = "bad btree header";
165         BUG_ON(b->nsets || b->written);
166
167         bch_bbio_free(b->bio, b->c);
168         b->bio = NULL;
169
170         mutex_lock(&b->c->fill_lock);
171         iter->used = 0;
172
173         if (btree_node_io_error(b) ||
174             !i->seq)
175                 goto err;
176
177         for (;
178              b->written < btree_blocks(b) && i->seq == b->sets[0].data->seq;
179              i = write_block(b)) {
180                 err = "unsupported bset version";
181                 if (i->version > BCACHE_BSET_VERSION)
182                         goto err;
183
184                 err = "bad btree header";
185                 if (b->written + set_blocks(i, b->c) > btree_blocks(b))
186                         goto err;
187
188                 err = "bad magic";
189                 if (i->magic != bset_magic(b->c))
190                         goto err;
191
192                 err = "bad checksum";
193                 switch (i->version) {
194                 case 0:
195                         if (i->csum != csum_set(i))
196                                 goto err;
197                         break;
198                 case BCACHE_BSET_VERSION:
199                         if (i->csum != btree_csum_set(b, i))
200                                 goto err;
201                         break;
202                 }
203
204                 err = "empty set";
205                 if (i != b->sets[0].data && !i->keys)
206                         goto err;
207
208                 bch_btree_iter_push(iter, i->start, end(i));
209
210                 b->written += set_blocks(i, b->c);
211         }
212
213         err = "corrupted btree";
214         for (i = write_block(b);
215              index(i, b) < btree_blocks(b);
216              i = ((void *) i) + block_bytes(b->c))
217                 if (i->seq == b->sets[0].data->seq)
218                         goto err;
219
220         bch_btree_sort_and_fix_extents(b, iter);
221
222         i = b->sets[0].data;
223         err = "short btree key";
224         if (b->sets[0].size &&
225             bkey_cmp(&b->key, &b->sets[0].end) < 0)
226                 goto err;
227
228         if (b->written < btree_blocks(b))
229                 bch_bset_init_next(b);
230 out:
231
232         mutex_unlock(&b->c->fill_lock);
233
234         spin_lock(&b->c->btree_read_time_lock);
235         bch_time_stats_update(&b->c->btree_read_time, b->io_start_time);
236         spin_unlock(&b->c->btree_read_time_lock);
237
238         smp_wmb(); /* read_done is our write lock */
239         set_btree_node_read_done(b);
240
241         closure_return(cl);
242 err:
243         set_btree_node_io_error(b);
244         bch_cache_set_error(b->c, "%s at bucket %zu, block %zu, %u keys",
245                             err, PTR_BUCKET_NR(b->c, &b->key, 0),
246                             index(i, b), i->keys);
247         goto out;
248 }
249
250 void bch_btree_read(struct btree *b)
251 {
252         BUG_ON(b->nsets || b->written);
253
254         if (!closure_trylock(&b->io.cl, &b->c->cl))
255                 BUG();
256
257         b->io_start_time = local_clock();
258
259         btree_bio_init(b);
260         b->bio->bi_rw   = REQ_META|READ_SYNC;
261         b->bio->bi_size = KEY_SIZE(&b->key) << 9;
262
263         bch_bio_map(b->bio, b->sets[0].data);
264
265         pr_debug("%s", pbtree(b));
266         trace_bcache_btree_read(b->bio);
267         bch_submit_bbio(b->bio, b->c, &b->key, 0);
268
269         continue_at(&b->io.cl, bch_btree_read_done, system_wq);
270 }
271
272 static void btree_complete_write(struct btree *b, struct btree_write *w)
273 {
274         if (w->prio_blocked &&
275             !atomic_sub_return(w->prio_blocked, &b->c->prio_blocked))
276                 wake_up(&b->c->alloc_wait);
277
278         if (w->journal) {
279                 atomic_dec_bug(w->journal);
280                 __closure_wake_up(&b->c->journal.wait);
281         }
282
283         if (w->owner)
284                 closure_put(w->owner);
285
286         w->prio_blocked = 0;
287         w->journal      = NULL;
288         w->owner        = NULL;
289 }
290
291 static void __btree_write_done(struct closure *cl)
292 {
293         struct btree *b = container_of(cl, struct btree, io.cl);
294         struct btree_write *w = btree_prev_write(b);
295
296         bch_bbio_free(b->bio, b->c);
297         b->bio = NULL;
298         btree_complete_write(b, w);
299
300         if (btree_node_dirty(b))
301                 queue_delayed_work(btree_io_wq, &b->work,
302                                    msecs_to_jiffies(30000));
303
304         closure_return(cl);
305 }
306
307 static void btree_write_done(struct closure *cl)
308 {
309         struct btree *b = container_of(cl, struct btree, io.cl);
310         struct bio_vec *bv;
311         int n;
312
313         __bio_for_each_segment(bv, b->bio, n, 0)
314                 __free_page(bv->bv_page);
315
316         __btree_write_done(cl);
317 }
318
319 static void do_btree_write(struct btree *b)
320 {
321         struct closure *cl = &b->io.cl;
322         struct bset *i = b->sets[b->nsets].data;
323         BKEY_PADDED(key) k;
324
325         i->version      = BCACHE_BSET_VERSION;
326         i->csum         = btree_csum_set(b, i);
327
328         btree_bio_init(b);
329         b->bio->bi_rw   = REQ_META|WRITE_SYNC|REQ_FUA;
330         b->bio->bi_size = set_blocks(i, b->c) * block_bytes(b->c);
331         bch_bio_map(b->bio, i);
332
333         /*
334          * If we're appending to a leaf node, we don't technically need FUA -
335          * this write just needs to be persisted before the next journal write,
336          * which will be marked FLUSH|FUA.
337          *
338          * Similarly if we're writing a new btree root - the pointer is going to
339          * be in the next journal entry.
340          *
341          * But if we're writing a new btree node (that isn't a root) or
342          * appending to a non leaf btree node, we need either FUA or a flush
343          * when we write the parent with the new pointer. FUA is cheaper than a
344          * flush, and writes appending to leaf nodes aren't blocking anything so
345          * just make all btree node writes FUA to keep things sane.
346          */
347
348         bkey_copy(&k.key, &b->key);
349         SET_PTR_OFFSET(&k.key, 0, PTR_OFFSET(&k.key, 0) + bset_offset(b, i));
350
351         if (!bch_bio_alloc_pages(b->bio, GFP_NOIO)) {
352                 int j;
353                 struct bio_vec *bv;
354                 void *base = (void *) ((unsigned long) i & ~(PAGE_SIZE - 1));
355
356                 bio_for_each_segment(bv, b->bio, j)
357                         memcpy(page_address(bv->bv_page),
358                                base + j * PAGE_SIZE, PAGE_SIZE);
359
360                 trace_bcache_btree_write(b->bio);
361                 bch_submit_bbio(b->bio, b->c, &k.key, 0);
362
363                 continue_at(cl, btree_write_done, NULL);
364         } else {
365                 b->bio->bi_vcnt = 0;
366                 bch_bio_map(b->bio, i);
367
368                 trace_bcache_btree_write(b->bio);
369                 bch_submit_bbio(b->bio, b->c, &k.key, 0);
370
371                 closure_sync(cl);
372                 __btree_write_done(cl);
373         }
374 }
375
376 static void __btree_write(struct btree *b)
377 {
378         struct bset *i = b->sets[b->nsets].data;
379
380         BUG_ON(current->bio_list);
381
382         closure_lock(&b->io, &b->c->cl);
383         cancel_delayed_work(&b->work);
384
385         clear_bit(BTREE_NODE_dirty,      &b->flags);
386         change_bit(BTREE_NODE_write_idx, &b->flags);
387
388         bch_check_key_order(b, i);
389         BUG_ON(b->written && !i->keys);
390
391         do_btree_write(b);
392
393         pr_debug("%s block %i keys %i", pbtree(b), b->written, i->keys);
394
395         b->written += set_blocks(i, b->c);
396         atomic_long_add(set_blocks(i, b->c) * b->c->sb.block_size,
397                         &PTR_CACHE(b->c, &b->key, 0)->btree_sectors_written);
398
399         bch_btree_sort_lazy(b);
400
401         if (b->written < btree_blocks(b))
402                 bch_bset_init_next(b);
403 }
404
405 static void btree_write_work(struct work_struct *w)
406 {
407         struct btree *b = container_of(to_delayed_work(w), struct btree, work);
408
409         down_write(&b->lock);
410
411         if (btree_node_dirty(b))
412                 __btree_write(b);
413         up_write(&b->lock);
414 }
415
416 void bch_btree_write(struct btree *b, bool now, struct btree_op *op)
417 {
418         struct bset *i = b->sets[b->nsets].data;
419         struct btree_write *w = btree_current_write(b);
420
421         BUG_ON(b->written &&
422                (b->written >= btree_blocks(b) ||
423                 i->seq != b->sets[0].data->seq ||
424                 !i->keys));
425
426         if (!btree_node_dirty(b)) {
427                 set_btree_node_dirty(b);
428                 queue_delayed_work(btree_io_wq, &b->work,
429                                    msecs_to_jiffies(30000));
430         }
431
432         w->prio_blocked += b->prio_blocked;
433         b->prio_blocked = 0;
434
435         if (op && op->journal && !b->level) {
436                 if (w->journal &&
437                     journal_pin_cmp(b->c, w, op)) {
438                         atomic_dec_bug(w->journal);
439                         w->journal = NULL;
440                 }
441
442                 if (!w->journal) {
443                         w->journal = op->journal;
444                         atomic_inc(w->journal);
445                 }
446         }
447
448         if (current->bio_list)
449                 return;
450
451         /* Force write if set is too big */
452         if (now ||
453             b->level ||
454             set_bytes(i) > PAGE_SIZE - 48) {
455                 if (op && now) {
456                         /* Must wait on multiple writes */
457                         BUG_ON(w->owner);
458                         w->owner = &op->cl;
459                         closure_get(&op->cl);
460                 }
461
462                 __btree_write(b);
463         }
464         BUG_ON(!b->written);
465 }
466
467 /*
468  * Btree in memory cache - allocation/freeing
469  * mca -> memory cache
470  */
471
472 static void mca_reinit(struct btree *b)
473 {
474         unsigned i;
475
476         b->flags        = 0;
477         b->written      = 0;
478         b->nsets        = 0;
479
480         for (i = 0; i < MAX_BSETS; i++)
481                 b->sets[i].size = 0;
482         /*
483          * Second loop starts at 1 because b->sets[0]->data is the memory we
484          * allocated
485          */
486         for (i = 1; i < MAX_BSETS; i++)
487                 b->sets[i].data = NULL;
488 }
489
490 #define mca_reserve(c)  (((c->root && c->root->level)           \
491                           ? c->root->level : 1) * 8 + 16)
492 #define mca_can_free(c)                                         \
493         max_t(int, 0, c->bucket_cache_used - mca_reserve(c))
494
495 static void mca_data_free(struct btree *b)
496 {
497         struct bset_tree *t = b->sets;
498         BUG_ON(!closure_is_unlocked(&b->io.cl));
499
500         if (bset_prev_bytes(b) < PAGE_SIZE)
501                 kfree(t->prev);
502         else
503                 free_pages((unsigned long) t->prev,
504                            get_order(bset_prev_bytes(b)));
505
506         if (bset_tree_bytes(b) < PAGE_SIZE)
507                 kfree(t->tree);
508         else
509                 free_pages((unsigned long) t->tree,
510                            get_order(bset_tree_bytes(b)));
511
512         free_pages((unsigned long) t->data, b->page_order);
513
514         t->prev = NULL;
515         t->tree = NULL;
516         t->data = NULL;
517         list_move(&b->list, &b->c->btree_cache_freed);
518         b->c->bucket_cache_used--;
519 }
520
521 static void mca_bucket_free(struct btree *b)
522 {
523         BUG_ON(btree_node_dirty(b));
524
525         b->key.ptr[0] = 0;
526         hlist_del_init_rcu(&b->hash);
527         list_move(&b->list, &b->c->btree_cache_freeable);
528 }
529
530 static unsigned btree_order(struct bkey *k)
531 {
532         return ilog2(KEY_SIZE(k) / PAGE_SECTORS ?: 1);
533 }
534
535 static void mca_data_alloc(struct btree *b, struct bkey *k, gfp_t gfp)
536 {
537         struct bset_tree *t = b->sets;
538         BUG_ON(t->data);
539
540         b->page_order = max_t(unsigned,
541                               ilog2(b->c->btree_pages),
542                               btree_order(k));
543
544         t->data = (void *) __get_free_pages(gfp, b->page_order);
545         if (!t->data)
546                 goto err;
547
548         t->tree = bset_tree_bytes(b) < PAGE_SIZE
549                 ? kmalloc(bset_tree_bytes(b), gfp)
550                 : (void *) __get_free_pages(gfp, get_order(bset_tree_bytes(b)));
551         if (!t->tree)
552                 goto err;
553
554         t->prev = bset_prev_bytes(b) < PAGE_SIZE
555                 ? kmalloc(bset_prev_bytes(b), gfp)
556                 : (void *) __get_free_pages(gfp, get_order(bset_prev_bytes(b)));
557         if (!t->prev)
558                 goto err;
559
560         list_move(&b->list, &b->c->btree_cache);
561         b->c->bucket_cache_used++;
562         return;
563 err:
564         mca_data_free(b);
565 }
566
567 static struct btree *mca_bucket_alloc(struct cache_set *c,
568                                       struct bkey *k, gfp_t gfp)
569 {
570         struct btree *b = kzalloc(sizeof(struct btree), gfp);
571         if (!b)
572                 return NULL;
573
574         init_rwsem(&b->lock);
575         lockdep_set_novalidate_class(&b->lock);
576         INIT_LIST_HEAD(&b->list);
577         INIT_DELAYED_WORK(&b->work, btree_write_work);
578         b->c = c;
579         closure_init_unlocked(&b->io);
580
581         mca_data_alloc(b, k, gfp);
582         return b;
583 }
584
585 static int mca_reap(struct btree *b, struct closure *cl, unsigned min_order)
586 {
587         lockdep_assert_held(&b->c->bucket_lock);
588
589         if (!down_write_trylock(&b->lock))
590                 return -ENOMEM;
591
592         if (b->page_order < min_order) {
593                 rw_unlock(true, b);
594                 return -ENOMEM;
595         }
596
597         BUG_ON(btree_node_dirty(b) && !b->sets[0].data);
598
599         if (cl && btree_node_dirty(b))
600                 bch_btree_write(b, true, NULL);
601
602         if (cl)
603                 closure_wait_event_async(&b->io.wait, cl,
604                          atomic_read(&b->io.cl.remaining) == -1);
605
606         if (btree_node_dirty(b) ||
607             !closure_is_unlocked(&b->io.cl) ||
608             work_pending(&b->work.work)) {
609                 rw_unlock(true, b);
610                 return -EAGAIN;
611         }
612
613         return 0;
614 }
615
616 static int bch_mca_shrink(struct shrinker *shrink, struct shrink_control *sc)
617 {
618         struct cache_set *c = container_of(shrink, struct cache_set, shrink);
619         struct btree *b, *t;
620         unsigned long i, nr = sc->nr_to_scan;
621
622         if (c->shrinker_disabled)
623                 return 0;
624
625         if (c->try_harder)
626                 return 0;
627
628         /*
629          * If nr == 0, we're supposed to return the number of items we have
630          * cached. Not allowed to return -1.
631          */
632         if (!nr)
633                 return mca_can_free(c) * c->btree_pages;
634
635         /* Return -1 if we can't do anything right now */
636         if (sc->gfp_mask & __GFP_IO)
637                 mutex_lock(&c->bucket_lock);
638         else if (!mutex_trylock(&c->bucket_lock))
639                 return -1;
640
641         nr /= c->btree_pages;
642         nr = min_t(unsigned long, nr, mca_can_free(c));
643
644         i = 0;
645         list_for_each_entry_safe(b, t, &c->btree_cache_freeable, list) {
646                 if (!nr)
647                         break;
648
649                 if (++i > 3 &&
650                     !mca_reap(b, NULL, 0)) {
651                         mca_data_free(b);
652                         rw_unlock(true, b);
653                         --nr;
654                 }
655         }
656
657         /*
658          * Can happen right when we first start up, before we've read in any
659          * btree nodes
660          */
661         if (list_empty(&c->btree_cache))
662                 goto out;
663
664         for (i = 0; nr && i < c->bucket_cache_used; i++) {
665                 b = list_first_entry(&c->btree_cache, struct btree, list);
666                 list_rotate_left(&c->btree_cache);
667
668                 if (!b->accessed &&
669                     !mca_reap(b, NULL, 0)) {
670                         mca_bucket_free(b);
671                         mca_data_free(b);
672                         rw_unlock(true, b);
673                         --nr;
674                 } else
675                         b->accessed = 0;
676         }
677 out:
678         nr = mca_can_free(c) * c->btree_pages;
679         mutex_unlock(&c->bucket_lock);
680         return nr;
681 }
682
683 void bch_btree_cache_free(struct cache_set *c)
684 {
685         struct btree *b;
686         struct closure cl;
687         closure_init_stack(&cl);
688
689         if (c->shrink.list.next)
690                 unregister_shrinker(&c->shrink);
691
692         mutex_lock(&c->bucket_lock);
693
694 #ifdef CONFIG_BCACHE_DEBUG
695         if (c->verify_data)
696                 list_move(&c->verify_data->list, &c->btree_cache);
697 #endif
698
699         list_splice(&c->btree_cache_freeable,
700                     &c->btree_cache);
701
702         while (!list_empty(&c->btree_cache)) {
703                 b = list_first_entry(&c->btree_cache, struct btree, list);
704
705                 if (btree_node_dirty(b))
706                         btree_complete_write(b, btree_current_write(b));
707                 clear_bit(BTREE_NODE_dirty, &b->flags);
708
709                 mca_data_free(b);
710         }
711
712         while (!list_empty(&c->btree_cache_freed)) {
713                 b = list_first_entry(&c->btree_cache_freed,
714                                      struct btree, list);
715                 list_del(&b->list);
716                 cancel_delayed_work_sync(&b->work);
717                 kfree(b);
718         }
719
720         mutex_unlock(&c->bucket_lock);
721 }
722
723 int bch_btree_cache_alloc(struct cache_set *c)
724 {
725         unsigned i;
726
727         /* XXX: doesn't check for errors */
728
729         closure_init_unlocked(&c->gc);
730
731         for (i = 0; i < mca_reserve(c); i++)
732                 mca_bucket_alloc(c, &ZERO_KEY, GFP_KERNEL);
733
734         list_splice_init(&c->btree_cache,
735                          &c->btree_cache_freeable);
736
737 #ifdef CONFIG_BCACHE_DEBUG
738         mutex_init(&c->verify_lock);
739
740         c->verify_data = mca_bucket_alloc(c, &ZERO_KEY, GFP_KERNEL);
741
742         if (c->verify_data &&
743             c->verify_data->sets[0].data)
744                 list_del_init(&c->verify_data->list);
745         else
746                 c->verify_data = NULL;
747 #endif
748
749         c->shrink.shrink = bch_mca_shrink;
750         c->shrink.seeks = 4;
751         c->shrink.batch = c->btree_pages * 2;
752         register_shrinker(&c->shrink);
753
754         return 0;
755 }
756
757 /* Btree in memory cache - hash table */
758
759 static struct hlist_head *mca_hash(struct cache_set *c, struct bkey *k)
760 {
761         return &c->bucket_hash[hash_32(PTR_HASH(c, k), BUCKET_HASH_BITS)];
762 }
763
764 static struct btree *mca_find(struct cache_set *c, struct bkey *k)
765 {
766         struct btree *b;
767
768         rcu_read_lock();
769         hlist_for_each_entry_rcu(b, mca_hash(c, k), hash)
770                 if (PTR_HASH(c, &b->key) == PTR_HASH(c, k))
771                         goto out;
772         b = NULL;
773 out:
774         rcu_read_unlock();
775         return b;
776 }
777
778 static struct btree *mca_cannibalize(struct cache_set *c, struct bkey *k,
779                                      int level, struct closure *cl)
780 {
781         int ret = -ENOMEM;
782         struct btree *i;
783
784         if (!cl)
785                 return ERR_PTR(-ENOMEM);
786
787         /*
788          * Trying to free up some memory - i.e. reuse some btree nodes - may
789          * require initiating IO to flush the dirty part of the node. If we're
790          * running under generic_make_request(), that IO will never finish and
791          * we would deadlock. Returning -EAGAIN causes the cache lookup code to
792          * punt to workqueue and retry.
793          */
794         if (current->bio_list)
795                 return ERR_PTR(-EAGAIN);
796
797         if (c->try_harder && c->try_harder != cl) {
798                 closure_wait_event_async(&c->try_wait, cl, !c->try_harder);
799                 return ERR_PTR(-EAGAIN);
800         }
801
802         /* XXX: tracepoint */
803         c->try_harder = cl;
804         c->try_harder_start = local_clock();
805 retry:
806         list_for_each_entry_reverse(i, &c->btree_cache, list) {
807                 int r = mca_reap(i, cl, btree_order(k));
808                 if (!r)
809                         return i;
810                 if (r != -ENOMEM)
811                         ret = r;
812         }
813
814         if (ret == -EAGAIN &&
815             closure_blocking(cl)) {
816                 mutex_unlock(&c->bucket_lock);
817                 closure_sync(cl);
818                 mutex_lock(&c->bucket_lock);
819                 goto retry;
820         }
821
822         return ERR_PTR(ret);
823 }
824
825 /*
826  * We can only have one thread cannibalizing other cached btree nodes at a time,
827  * or we'll deadlock. We use an open coded mutex to ensure that, which a
828  * cannibalize_bucket() will take. This means every time we unlock the root of
829  * the btree, we need to release this lock if we have it held.
830  */
831 void bch_cannibalize_unlock(struct cache_set *c, struct closure *cl)
832 {
833         if (c->try_harder == cl) {
834                 bch_time_stats_update(&c->try_harder_time, c->try_harder_start);
835                 c->try_harder = NULL;
836                 __closure_wake_up(&c->try_wait);
837         }
838 }
839
840 static struct btree *mca_alloc(struct cache_set *c, struct bkey *k,
841                                int level, struct closure *cl)
842 {
843         struct btree *b;
844
845         lockdep_assert_held(&c->bucket_lock);
846
847         if (mca_find(c, k))
848                 return NULL;
849
850         /* btree_free() doesn't free memory; it sticks the node on the end of
851          * the list. Check if there's any freed nodes there:
852          */
853         list_for_each_entry(b, &c->btree_cache_freeable, list)
854                 if (!mca_reap(b, NULL, btree_order(k)))
855                         goto out;
856
857         /* We never free struct btree itself, just the memory that holds the on
858          * disk node. Check the freed list before allocating a new one:
859          */
860         list_for_each_entry(b, &c->btree_cache_freed, list)
861                 if (!mca_reap(b, NULL, 0)) {
862                         mca_data_alloc(b, k, __GFP_NOWARN|GFP_NOIO);
863                         if (!b->sets[0].data)
864                                 goto err;
865                         else
866                                 goto out;
867                 }
868
869         b = mca_bucket_alloc(c, k, __GFP_NOWARN|GFP_NOIO);
870         if (!b)
871                 goto err;
872
873         BUG_ON(!down_write_trylock(&b->lock));
874         if (!b->sets->data)
875                 goto err;
876 out:
877         BUG_ON(!closure_is_unlocked(&b->io.cl));
878
879         bkey_copy(&b->key, k);
880         list_move(&b->list, &c->btree_cache);
881         hlist_del_init_rcu(&b->hash);
882         hlist_add_head_rcu(&b->hash, mca_hash(c, k));
883
884         lock_set_subclass(&b->lock.dep_map, level + 1, _THIS_IP_);
885         b->level        = level;
886
887         mca_reinit(b);
888
889         return b;
890 err:
891         if (b)
892                 rw_unlock(true, b);
893
894         b = mca_cannibalize(c, k, level, cl);
895         if (!IS_ERR(b))
896                 goto out;
897
898         return b;
899 }
900
901 /**
902  * bch_btree_node_get - find a btree node in the cache and lock it, reading it
903  * in from disk if necessary.
904  *
905  * If IO is necessary, it uses the closure embedded in struct btree_op to wait;
906  * if that closure is in non blocking mode, will return -EAGAIN.
907  *
908  * The btree node will have either a read or a write lock held, depending on
909  * level and op->lock.
910  */
911 struct btree *bch_btree_node_get(struct cache_set *c, struct bkey *k,
912                                  int level, struct btree_op *op)
913 {
914         int i = 0;
915         bool write = level <= op->lock;
916         struct btree *b;
917
918         BUG_ON(level < 0);
919 retry:
920         b = mca_find(c, k);
921
922         if (!b) {
923                 mutex_lock(&c->bucket_lock);
924                 b = mca_alloc(c, k, level, &op->cl);
925                 mutex_unlock(&c->bucket_lock);
926
927                 if (!b)
928                         goto retry;
929                 if (IS_ERR(b))
930                         return b;
931
932                 bch_btree_read(b);
933
934                 if (!write)
935                         downgrade_write(&b->lock);
936         } else {
937                 rw_lock(write, b, level);
938                 if (PTR_HASH(c, &b->key) != PTR_HASH(c, k)) {
939                         rw_unlock(write, b);
940                         goto retry;
941                 }
942                 BUG_ON(b->level != level);
943         }
944
945         b->accessed = 1;
946
947         for (; i <= b->nsets && b->sets[i].size; i++) {
948                 prefetch(b->sets[i].tree);
949                 prefetch(b->sets[i].data);
950         }
951
952         for (; i <= b->nsets; i++)
953                 prefetch(b->sets[i].data);
954
955         if (!closure_wait_event(&b->io.wait, &op->cl,
956                                 btree_node_read_done(b))) {
957                 rw_unlock(write, b);
958                 b = ERR_PTR(-EAGAIN);
959         } else if (btree_node_io_error(b)) {
960                 rw_unlock(write, b);
961                 b = ERR_PTR(-EIO);
962         } else
963                 BUG_ON(!b->written);
964
965         return b;
966 }
967
968 static void btree_node_prefetch(struct cache_set *c, struct bkey *k, int level)
969 {
970         struct btree *b;
971
972         mutex_lock(&c->bucket_lock);
973         b = mca_alloc(c, k, level, NULL);
974         mutex_unlock(&c->bucket_lock);
975
976         if (!IS_ERR_OR_NULL(b)) {
977                 bch_btree_read(b);
978                 rw_unlock(true, b);
979         }
980 }
981
982 /* Btree alloc */
983
984 static void btree_node_free(struct btree *b, struct btree_op *op)
985 {
986         unsigned i;
987
988         /*
989          * The BUG_ON() in btree_node_get() implies that we must have a write
990          * lock on parent to free or even invalidate a node
991          */
992         BUG_ON(op->lock <= b->level);
993         BUG_ON(b == b->c->root);
994         pr_debug("bucket %s", pbtree(b));
995
996         if (btree_node_dirty(b))
997                 btree_complete_write(b, btree_current_write(b));
998         clear_bit(BTREE_NODE_dirty, &b->flags);
999
1000         if (b->prio_blocked &&
1001             !atomic_sub_return(b->prio_blocked, &b->c->prio_blocked))
1002                 wake_up(&b->c->alloc_wait);
1003
1004         b->prio_blocked = 0;
1005
1006         cancel_delayed_work(&b->work);
1007
1008         mutex_lock(&b->c->bucket_lock);
1009
1010         for (i = 0; i < KEY_PTRS(&b->key); i++) {
1011                 BUG_ON(atomic_read(&PTR_BUCKET(b->c, &b->key, i)->pin));
1012
1013                 bch_inc_gen(PTR_CACHE(b->c, &b->key, i),
1014                             PTR_BUCKET(b->c, &b->key, i));
1015         }
1016
1017         bch_bucket_free(b->c, &b->key);
1018         mca_bucket_free(b);
1019         mutex_unlock(&b->c->bucket_lock);
1020 }
1021
1022 struct btree *bch_btree_node_alloc(struct cache_set *c, int level,
1023                                    struct closure *cl)
1024 {
1025         BKEY_PADDED(key) k;
1026         struct btree *b = ERR_PTR(-EAGAIN);
1027
1028         mutex_lock(&c->bucket_lock);
1029 retry:
1030         if (__bch_bucket_alloc_set(c, WATERMARK_METADATA, &k.key, 1, cl))
1031                 goto err;
1032
1033         SET_KEY_SIZE(&k.key, c->btree_pages * PAGE_SECTORS);
1034
1035         b = mca_alloc(c, &k.key, level, cl);
1036         if (IS_ERR(b))
1037                 goto err_free;
1038
1039         if (!b) {
1040                 cache_bug(c,
1041                         "Tried to allocate bucket that was in btree cache");
1042                 __bkey_put(c, &k.key);
1043                 goto retry;
1044         }
1045
1046         set_btree_node_read_done(b);
1047         b->accessed = 1;
1048         bch_bset_init_next(b);
1049
1050         mutex_unlock(&c->bucket_lock);
1051         return b;
1052 err_free:
1053         bch_bucket_free(c, &k.key);
1054         __bkey_put(c, &k.key);
1055 err:
1056         mutex_unlock(&c->bucket_lock);
1057         return b;
1058 }
1059
1060 static struct btree *btree_node_alloc_replacement(struct btree *b,
1061                                                   struct closure *cl)
1062 {
1063         struct btree *n = bch_btree_node_alloc(b->c, b->level, cl);
1064         if (!IS_ERR_OR_NULL(n))
1065                 bch_btree_sort_into(b, n);
1066
1067         return n;
1068 }
1069
1070 /* Garbage collection */
1071
1072 uint8_t __bch_btree_mark_key(struct cache_set *c, int level, struct bkey *k)
1073 {
1074         uint8_t stale = 0;
1075         unsigned i;
1076         struct bucket *g;
1077
1078         /*
1079          * ptr_invalid() can't return true for the keys that mark btree nodes as
1080          * freed, but since ptr_bad() returns true we'll never actually use them
1081          * for anything and thus we don't want mark their pointers here
1082          */
1083         if (!bkey_cmp(k, &ZERO_KEY))
1084                 return stale;
1085
1086         for (i = 0; i < KEY_PTRS(k); i++) {
1087                 if (!ptr_available(c, k, i))
1088                         continue;
1089
1090                 g = PTR_BUCKET(c, k, i);
1091
1092                 if (gen_after(g->gc_gen, PTR_GEN(k, i)))
1093                         g->gc_gen = PTR_GEN(k, i);
1094
1095                 if (ptr_stale(c, k, i)) {
1096                         stale = max(stale, ptr_stale(c, k, i));
1097                         continue;
1098                 }
1099
1100                 cache_bug_on(GC_MARK(g) &&
1101                              (GC_MARK(g) == GC_MARK_METADATA) != (level != 0),
1102                              c, "inconsistent ptrs: mark = %llu, level = %i",
1103                              GC_MARK(g), level);
1104
1105                 if (level)
1106                         SET_GC_MARK(g, GC_MARK_METADATA);
1107                 else if (KEY_DIRTY(k))
1108                         SET_GC_MARK(g, GC_MARK_DIRTY);
1109
1110                 /* guard against overflow */
1111                 SET_GC_SECTORS_USED(g, min_t(unsigned,
1112                                              GC_SECTORS_USED(g) + KEY_SIZE(k),
1113                                              (1 << 14) - 1));
1114
1115                 BUG_ON(!GC_SECTORS_USED(g));
1116         }
1117
1118         return stale;
1119 }
1120
1121 #define btree_mark_key(b, k)    __bch_btree_mark_key(b->c, b->level, k)
1122
1123 static int btree_gc_mark_node(struct btree *b, unsigned *keys,
1124                               struct gc_stat *gc)
1125 {
1126         uint8_t stale = 0;
1127         unsigned last_dev = -1;
1128         struct bcache_device *d = NULL;
1129         struct bkey *k;
1130         struct btree_iter iter;
1131         struct bset_tree *t;
1132
1133         gc->nodes++;
1134
1135         for_each_key_filter(b, k, &iter, bch_ptr_invalid) {
1136                 if (last_dev != KEY_INODE(k)) {
1137                         last_dev = KEY_INODE(k);
1138
1139                         d = KEY_INODE(k) < b->c->nr_uuids
1140                                 ? b->c->devices[last_dev]
1141                                 : NULL;
1142                 }
1143
1144                 stale = max(stale, btree_mark_key(b, k));
1145
1146                 if (bch_ptr_bad(b, k))
1147                         continue;
1148
1149                 *keys += bkey_u64s(k);
1150
1151                 gc->key_bytes += bkey_u64s(k);
1152                 gc->nkeys++;
1153
1154                 gc->data += KEY_SIZE(k);
1155                 if (KEY_DIRTY(k)) {
1156                         gc->dirty += KEY_SIZE(k);
1157                         if (d)
1158                                 d->sectors_dirty_gc += KEY_SIZE(k);
1159                 }
1160         }
1161
1162         for (t = b->sets; t <= &b->sets[b->nsets]; t++)
1163                 btree_bug_on(t->size &&
1164                              bset_written(b, t) &&
1165                              bkey_cmp(&b->key, &t->end) < 0,
1166                              b, "found short btree key in gc");
1167
1168         return stale;
1169 }
1170
1171 static struct btree *btree_gc_alloc(struct btree *b, struct bkey *k,
1172                                     struct btree_op *op)
1173 {
1174         /*
1175          * We block priorities from being written for the duration of garbage
1176          * collection, so we can't sleep in btree_alloc() ->
1177          * bch_bucket_alloc_set(), or we'd risk deadlock - so we don't pass it
1178          * our closure.
1179          */
1180         struct btree *n = btree_node_alloc_replacement(b, NULL);
1181
1182         if (!IS_ERR_OR_NULL(n)) {
1183                 swap(b, n);
1184
1185                 memcpy(k->ptr, b->key.ptr,
1186                        sizeof(uint64_t) * KEY_PTRS(&b->key));
1187
1188                 __bkey_put(b->c, &b->key);
1189                 atomic_inc(&b->c->prio_blocked);
1190                 b->prio_blocked++;
1191
1192                 btree_node_free(n, op);
1193                 up_write(&n->lock);
1194         }
1195
1196         return b;
1197 }
1198
1199 /*
1200  * Leaving this at 2 until we've got incremental garbage collection done; it
1201  * could be higher (and has been tested with 4) except that garbage collection
1202  * could take much longer, adversely affecting latency.
1203  */
1204 #define GC_MERGE_NODES  2U
1205
1206 struct gc_merge_info {
1207         struct btree    *b;
1208         struct bkey     *k;
1209         unsigned        keys;
1210 };
1211
1212 static void btree_gc_coalesce(struct btree *b, struct btree_op *op,
1213                               struct gc_stat *gc, struct gc_merge_info *r)
1214 {
1215         unsigned nodes = 0, keys = 0, blocks;
1216         int i;
1217
1218         while (nodes < GC_MERGE_NODES && r[nodes].b)
1219                 keys += r[nodes++].keys;
1220
1221         blocks = btree_default_blocks(b->c) * 2 / 3;
1222
1223         if (nodes < 2 ||
1224             __set_blocks(b->sets[0].data, keys, b->c) > blocks * (nodes - 1))
1225                 return;
1226
1227         for (i = nodes - 1; i >= 0; --i) {
1228                 if (r[i].b->written)
1229                         r[i].b = btree_gc_alloc(r[i].b, r[i].k, op);
1230
1231                 if (r[i].b->written)
1232                         return;
1233         }
1234
1235         for (i = nodes - 1; i > 0; --i) {
1236                 struct bset *n1 = r[i].b->sets->data;
1237                 struct bset *n2 = r[i - 1].b->sets->data;
1238                 struct bkey *k, *last = NULL;
1239
1240                 keys = 0;
1241
1242                 if (i == 1) {
1243                         /*
1244                          * Last node we're not getting rid of - we're getting
1245                          * rid of the node at r[0]. Have to try and fit all of
1246                          * the remaining keys into this node; we can't ensure
1247                          * they will always fit due to rounding and variable
1248                          * length keys (shouldn't be possible in practice,
1249                          * though)
1250                          */
1251                         if (__set_blocks(n1, n1->keys + r->keys,
1252                                          b->c) > btree_blocks(r[i].b))
1253                                 return;
1254
1255                         keys = n2->keys;
1256                         last = &r->b->key;
1257                 } else
1258                         for (k = n2->start;
1259                              k < end(n2);
1260                              k = bkey_next(k)) {
1261                                 if (__set_blocks(n1, n1->keys + keys +
1262                                                  bkey_u64s(k), b->c) > blocks)
1263                                         break;
1264
1265                                 last = k;
1266                                 keys += bkey_u64s(k);
1267                         }
1268
1269                 BUG_ON(__set_blocks(n1, n1->keys + keys,
1270                                     b->c) > btree_blocks(r[i].b));
1271
1272                 if (last) {
1273                         bkey_copy_key(&r[i].b->key, last);
1274                         bkey_copy_key(r[i].k, last);
1275                 }
1276
1277                 memcpy(end(n1),
1278                        n2->start,
1279                        (void *) node(n2, keys) - (void *) n2->start);
1280
1281                 n1->keys += keys;
1282
1283                 memmove(n2->start,
1284                         node(n2, keys),
1285                         (void *) end(n2) - (void *) node(n2, keys));
1286
1287                 n2->keys -= keys;
1288
1289                 r[i].keys       = n1->keys;
1290                 r[i - 1].keys   = n2->keys;
1291         }
1292
1293         btree_node_free(r->b, op);
1294         up_write(&r->b->lock);
1295
1296         pr_debug("coalesced %u nodes", nodes);
1297
1298         gc->nodes--;
1299         nodes--;
1300
1301         memmove(&r[0], &r[1], sizeof(struct gc_merge_info) * nodes);
1302         memset(&r[nodes], 0, sizeof(struct gc_merge_info));
1303 }
1304
1305 static int btree_gc_recurse(struct btree *b, struct btree_op *op,
1306                             struct closure *writes, struct gc_stat *gc)
1307 {
1308         void write(struct btree *r)
1309         {
1310                 if (!r->written)
1311                         bch_btree_write(r, true, op);
1312                 else if (btree_node_dirty(r)) {
1313                         BUG_ON(btree_current_write(r)->owner);
1314                         btree_current_write(r)->owner = writes;
1315                         closure_get(writes);
1316
1317                         bch_btree_write(r, true, NULL);
1318                 }
1319
1320                 up_write(&r->lock);
1321         }
1322
1323         int ret = 0, stale;
1324         unsigned i;
1325         struct gc_merge_info r[GC_MERGE_NODES];
1326
1327         memset(r, 0, sizeof(r));
1328
1329         while ((r->k = bch_next_recurse_key(b, &b->c->gc_done))) {
1330                 r->b = bch_btree_node_get(b->c, r->k, b->level - 1, op);
1331
1332                 if (IS_ERR(r->b)) {
1333                         ret = PTR_ERR(r->b);
1334                         break;
1335                 }
1336
1337                 r->keys = 0;
1338                 stale = btree_gc_mark_node(r->b, &r->keys, gc);
1339
1340                 if (!b->written &&
1341                     (r->b->level || stale > 10 ||
1342                      b->c->gc_always_rewrite))
1343                         r->b = btree_gc_alloc(r->b, r->k, op);
1344
1345                 if (r->b->level)
1346                         ret = btree_gc_recurse(r->b, op, writes, gc);
1347
1348                 if (ret) {
1349                         write(r->b);
1350                         break;
1351                 }
1352
1353                 bkey_copy_key(&b->c->gc_done, r->k);
1354
1355                 if (!b->written)
1356                         btree_gc_coalesce(b, op, gc, r);
1357
1358                 if (r[GC_MERGE_NODES - 1].b)
1359                         write(r[GC_MERGE_NODES - 1].b);
1360
1361                 memmove(&r[1], &r[0],
1362                         sizeof(struct gc_merge_info) * (GC_MERGE_NODES - 1));
1363
1364                 /* When we've got incremental GC working, we'll want to do
1365                  * if (should_resched())
1366                  *      return -EAGAIN;
1367                  */
1368                 cond_resched();
1369 #if 0
1370                 if (need_resched()) {
1371                         ret = -EAGAIN;
1372                         break;
1373                 }
1374 #endif
1375         }
1376
1377         for (i = 1; i < GC_MERGE_NODES && r[i].b; i++)
1378                 write(r[i].b);
1379
1380         /* Might have freed some children, must remove their keys */
1381         if (!b->written)
1382                 bch_btree_sort(b);
1383
1384         return ret;
1385 }
1386
1387 static int bch_btree_gc_root(struct btree *b, struct btree_op *op,
1388                              struct closure *writes, struct gc_stat *gc)
1389 {
1390         struct btree *n = NULL;
1391         unsigned keys = 0;
1392         int ret = 0, stale = btree_gc_mark_node(b, &keys, gc);
1393
1394         if (b->level || stale > 10)
1395                 n = btree_node_alloc_replacement(b, NULL);
1396
1397         if (!IS_ERR_OR_NULL(n))
1398                 swap(b, n);
1399
1400         if (b->level)
1401                 ret = btree_gc_recurse(b, op, writes, gc);
1402
1403         if (!b->written || btree_node_dirty(b)) {
1404                 atomic_inc(&b->c->prio_blocked);
1405                 b->prio_blocked++;
1406                 bch_btree_write(b, true, n ? op : NULL);
1407         }
1408
1409         if (!IS_ERR_OR_NULL(n)) {
1410                 closure_sync(&op->cl);
1411                 bch_btree_set_root(b);
1412                 btree_node_free(n, op);
1413                 rw_unlock(true, b);
1414         }
1415
1416         return ret;
1417 }
1418
1419 static void btree_gc_start(struct cache_set *c)
1420 {
1421         struct cache *ca;
1422         struct bucket *b;
1423         struct bcache_device **d;
1424         unsigned i;
1425
1426         if (!c->gc_mark_valid)
1427                 return;
1428
1429         mutex_lock(&c->bucket_lock);
1430
1431         c->gc_mark_valid = 0;
1432         c->gc_done = ZERO_KEY;
1433
1434         for_each_cache(ca, c, i)
1435                 for_each_bucket(b, ca) {
1436                         b->gc_gen = b->gen;
1437                         if (!atomic_read(&b->pin)) {
1438                                 SET_GC_MARK(b, GC_MARK_RECLAIMABLE);
1439                                 SET_GC_SECTORS_USED(b, 0);
1440                         }
1441                 }
1442
1443         for (d = c->devices;
1444              d < c->devices + c->nr_uuids;
1445              d++)
1446                 if (*d)
1447                         (*d)->sectors_dirty_gc = 0;
1448
1449         mutex_unlock(&c->bucket_lock);
1450 }
1451
1452 size_t bch_btree_gc_finish(struct cache_set *c)
1453 {
1454         size_t available = 0;
1455         struct bucket *b;
1456         struct cache *ca;
1457         struct bcache_device **d;
1458         unsigned i;
1459
1460         mutex_lock(&c->bucket_lock);
1461
1462         set_gc_sectors(c);
1463         c->gc_mark_valid = 1;
1464         c->need_gc      = 0;
1465
1466         if (c->root)
1467                 for (i = 0; i < KEY_PTRS(&c->root->key); i++)
1468                         SET_GC_MARK(PTR_BUCKET(c, &c->root->key, i),
1469                                     GC_MARK_METADATA);
1470
1471         for (i = 0; i < KEY_PTRS(&c->uuid_bucket); i++)
1472                 SET_GC_MARK(PTR_BUCKET(c, &c->uuid_bucket, i),
1473                             GC_MARK_METADATA);
1474
1475         for_each_cache(ca, c, i) {
1476                 uint64_t *i;
1477
1478                 ca->invalidate_needs_gc = 0;
1479
1480                 for (i = ca->sb.d; i < ca->sb.d + ca->sb.keys; i++)
1481                         SET_GC_MARK(ca->buckets + *i, GC_MARK_METADATA);
1482
1483                 for (i = ca->prio_buckets;
1484                      i < ca->prio_buckets + prio_buckets(ca) * 2; i++)
1485                         SET_GC_MARK(ca->buckets + *i, GC_MARK_METADATA);
1486
1487                 for_each_bucket(b, ca) {
1488                         b->last_gc      = b->gc_gen;
1489                         c->need_gc      = max(c->need_gc, bucket_gc_gen(b));
1490
1491                         if (!atomic_read(&b->pin) &&
1492                             GC_MARK(b) == GC_MARK_RECLAIMABLE) {
1493                                 available++;
1494                                 if (!GC_SECTORS_USED(b))
1495                                         bch_bucket_add_unused(ca, b);
1496                         }
1497                 }
1498         }
1499
1500         for (d = c->devices;
1501              d < c->devices + c->nr_uuids;
1502              d++)
1503                 if (*d) {
1504                         unsigned long last =
1505                                 atomic_long_read(&((*d)->sectors_dirty));
1506                         long difference = (*d)->sectors_dirty_gc - last;
1507
1508                         pr_debug("sectors dirty off by %li", difference);
1509
1510                         (*d)->sectors_dirty_last += difference;
1511
1512                         atomic_long_set(&((*d)->sectors_dirty),
1513                                         (*d)->sectors_dirty_gc);
1514                 }
1515
1516         mutex_unlock(&c->bucket_lock);
1517         return available;
1518 }
1519
1520 static void bch_btree_gc(struct closure *cl)
1521 {
1522         struct cache_set *c = container_of(cl, struct cache_set, gc.cl);
1523         int ret;
1524         unsigned long available;
1525         struct gc_stat stats;
1526         struct closure writes;
1527         struct btree_op op;
1528
1529         uint64_t start_time = local_clock();
1530         trace_bcache_gc_start(c->sb.set_uuid);
1531         blktrace_msg_all(c, "Starting gc");
1532
1533         memset(&stats, 0, sizeof(struct gc_stat));
1534         closure_init_stack(&writes);
1535         bch_btree_op_init_stack(&op);
1536         op.lock = SHRT_MAX;
1537
1538         btree_gc_start(c);
1539
1540         ret = btree_root(gc_root, c, &op, &writes, &stats);
1541         closure_sync(&op.cl);
1542         closure_sync(&writes);
1543
1544         if (ret) {
1545                 blktrace_msg_all(c, "Stopped gc");
1546                 pr_warn("gc failed!");
1547
1548                 continue_at(cl, bch_btree_gc, bch_gc_wq);
1549         }
1550
1551         /* Possibly wait for new UUIDs or whatever to hit disk */
1552         bch_journal_meta(c, &op.cl);
1553         closure_sync(&op.cl);
1554
1555         available = bch_btree_gc_finish(c);
1556
1557         bch_time_stats_update(&c->btree_gc_time, start_time);
1558
1559         stats.key_bytes *= sizeof(uint64_t);
1560         stats.dirty     <<= 9;
1561         stats.data      <<= 9;
1562         stats.in_use    = (c->nbuckets - available) * 100 / c->nbuckets;
1563         memcpy(&c->gc_stats, &stats, sizeof(struct gc_stat));
1564         blktrace_msg_all(c, "Finished gc");
1565
1566         trace_bcache_gc_end(c->sb.set_uuid);
1567         wake_up(&c->alloc_wait);
1568
1569         continue_at(cl, bch_moving_gc, bch_gc_wq);
1570 }
1571
1572 void bch_queue_gc(struct cache_set *c)
1573 {
1574         closure_trylock_call(&c->gc.cl, bch_btree_gc, bch_gc_wq, &c->cl);
1575 }
1576
1577 /* Initial partial gc */
1578
1579 static int bch_btree_check_recurse(struct btree *b, struct btree_op *op,
1580                                    unsigned long **seen)
1581 {
1582         int ret;
1583         unsigned i;
1584         struct bkey *k;
1585         struct bucket *g;
1586         struct btree_iter iter;
1587
1588         for_each_key_filter(b, k, &iter, bch_ptr_invalid) {
1589                 for (i = 0; i < KEY_PTRS(k); i++) {
1590                         if (!ptr_available(b->c, k, i))
1591                                 continue;
1592
1593                         g = PTR_BUCKET(b->c, k, i);
1594
1595                         if (!__test_and_set_bit(PTR_BUCKET_NR(b->c, k, i),
1596                                                 seen[PTR_DEV(k, i)]) ||
1597                             !ptr_stale(b->c, k, i)) {
1598                                 g->gen = PTR_GEN(k, i);
1599
1600                                 if (b->level)
1601                                         g->prio = BTREE_PRIO;
1602                                 else if (g->prio == BTREE_PRIO)
1603                                         g->prio = INITIAL_PRIO;
1604                         }
1605                 }
1606
1607                 btree_mark_key(b, k);
1608         }
1609
1610         if (b->level) {
1611                 k = bch_next_recurse_key(b, &ZERO_KEY);
1612
1613                 while (k) {
1614                         struct bkey *p = bch_next_recurse_key(b, k);
1615                         if (p)
1616                                 btree_node_prefetch(b->c, p, b->level - 1);
1617
1618                         ret = btree(check_recurse, k, b, op, seen);
1619                         if (ret)
1620                                 return ret;
1621
1622                         k = p;
1623                 }
1624         }
1625
1626         return 0;
1627 }
1628
1629 int bch_btree_check(struct cache_set *c, struct btree_op *op)
1630 {
1631         int ret = -ENOMEM;
1632         unsigned i;
1633         unsigned long *seen[MAX_CACHES_PER_SET];
1634
1635         memset(seen, 0, sizeof(seen));
1636
1637         for (i = 0; c->cache[i]; i++) {
1638                 size_t n = DIV_ROUND_UP(c->cache[i]->sb.nbuckets, 8);
1639                 seen[i] = kmalloc(n, GFP_KERNEL);
1640                 if (!seen[i])
1641                         goto err;
1642
1643                 /* Disables the seen array until prio_read() uses it too */
1644                 memset(seen[i], 0xFF, n);
1645         }
1646
1647         ret = btree_root(check_recurse, c, op, seen);
1648 err:
1649         for (i = 0; i < MAX_CACHES_PER_SET; i++)
1650                 kfree(seen[i]);
1651         return ret;
1652 }
1653
1654 /* Btree insertion */
1655
1656 static void shift_keys(struct btree *b, struct bkey *where, struct bkey *insert)
1657 {
1658         struct bset *i = b->sets[b->nsets].data;
1659
1660         memmove((uint64_t *) where + bkey_u64s(insert),
1661                 where,
1662                 (void *) end(i) - (void *) where);
1663
1664         i->keys += bkey_u64s(insert);
1665         bkey_copy(where, insert);
1666         bch_bset_fix_lookup_table(b, where);
1667 }
1668
1669 static bool fix_overlapping_extents(struct btree *b,
1670                                     struct bkey *insert,
1671                                     struct btree_iter *iter,
1672                                     struct btree_op *op)
1673 {
1674         void subtract_dirty(struct bkey *k, int sectors)
1675         {
1676                 struct bcache_device *d = b->c->devices[KEY_INODE(k)];
1677
1678                 if (KEY_DIRTY(k) && d)
1679                         atomic_long_sub(sectors, &d->sectors_dirty);
1680         }
1681
1682         unsigned old_size, sectors_found = 0;
1683
1684         while (1) {
1685                 struct bkey *k = bch_btree_iter_next(iter);
1686                 if (!k ||
1687                     bkey_cmp(&START_KEY(k), insert) >= 0)
1688                         break;
1689
1690                 if (bkey_cmp(k, &START_KEY(insert)) <= 0)
1691                         continue;
1692
1693                 old_size = KEY_SIZE(k);
1694
1695                 /*
1696                  * We might overlap with 0 size extents; we can't skip these
1697                  * because if they're in the set we're inserting to we have to
1698                  * adjust them so they don't overlap with the key we're
1699                  * inserting. But we don't want to check them for BTREE_REPLACE
1700                  * operations.
1701                  */
1702
1703                 if (op->type == BTREE_REPLACE &&
1704                     KEY_SIZE(k)) {
1705                         /*
1706                          * k might have been split since we inserted/found the
1707                          * key we're replacing
1708                          */
1709                         unsigned i;
1710                         uint64_t offset = KEY_START(k) -
1711                                 KEY_START(&op->replace);
1712
1713                         /* But it must be a subset of the replace key */
1714                         if (KEY_START(k) < KEY_START(&op->replace) ||
1715                             KEY_OFFSET(k) > KEY_OFFSET(&op->replace))
1716                                 goto check_failed;
1717
1718                         /* We didn't find a key that we were supposed to */
1719                         if (KEY_START(k) > KEY_START(insert) + sectors_found)
1720                                 goto check_failed;
1721
1722                         if (KEY_PTRS(&op->replace) != KEY_PTRS(k))
1723                                 goto check_failed;
1724
1725                         /* skip past gen */
1726                         offset <<= 8;
1727
1728                         BUG_ON(!KEY_PTRS(&op->replace));
1729
1730                         for (i = 0; i < KEY_PTRS(&op->replace); i++)
1731                                 if (k->ptr[i] != op->replace.ptr[i] + offset)
1732                                         goto check_failed;
1733
1734                         sectors_found = KEY_OFFSET(k) - KEY_START(insert);
1735                 }
1736
1737                 if (bkey_cmp(insert, k) < 0 &&
1738                     bkey_cmp(&START_KEY(insert), &START_KEY(k)) > 0) {
1739                         /*
1740                          * We overlapped in the middle of an existing key: that
1741                          * means we have to split the old key. But we have to do
1742                          * slightly different things depending on whether the
1743                          * old key has been written out yet.
1744                          */
1745
1746                         struct bkey *top;
1747
1748                         subtract_dirty(k, KEY_SIZE(insert));
1749
1750                         if (bkey_written(b, k)) {
1751                                 /*
1752                                  * We insert a new key to cover the top of the
1753                                  * old key, and the old key is modified in place
1754                                  * to represent the bottom split.
1755                                  *
1756                                  * It's completely arbitrary whether the new key
1757                                  * is the top or the bottom, but it has to match
1758                                  * up with what btree_sort_fixup() does - it
1759                                  * doesn't check for this kind of overlap, it
1760                                  * depends on us inserting a new key for the top
1761                                  * here.
1762                                  */
1763                                 top = bch_bset_search(b, &b->sets[b->nsets],
1764                                                       insert);
1765                                 shift_keys(b, top, k);
1766                         } else {
1767                                 BKEY_PADDED(key) temp;
1768                                 bkey_copy(&temp.key, k);
1769                                 shift_keys(b, k, &temp.key);
1770                                 top = bkey_next(k);
1771                         }
1772
1773                         bch_cut_front(insert, top);
1774                         bch_cut_back(&START_KEY(insert), k);
1775                         bch_bset_fix_invalidated_key(b, k);
1776                         return false;
1777                 }
1778
1779                 if (bkey_cmp(insert, k) < 0) {
1780                         bch_cut_front(insert, k);
1781                 } else {
1782                         if (bkey_written(b, k) &&
1783                             bkey_cmp(&START_KEY(insert), &START_KEY(k)) <= 0) {
1784                                 /*
1785                                  * Completely overwrote, so we don't have to
1786                                  * invalidate the binary search tree
1787                                  */
1788                                 bch_cut_front(k, k);
1789                         } else {
1790                                 __bch_cut_back(&START_KEY(insert), k);
1791                                 bch_bset_fix_invalidated_key(b, k);
1792                         }
1793                 }
1794
1795                 subtract_dirty(k, old_size - KEY_SIZE(k));
1796         }
1797
1798 check_failed:
1799         if (op->type == BTREE_REPLACE) {
1800                 if (!sectors_found) {
1801                         op->insert_collision = true;
1802                         return true;
1803                 } else if (sectors_found < KEY_SIZE(insert)) {
1804                         SET_KEY_OFFSET(insert, KEY_OFFSET(insert) -
1805                                        (KEY_SIZE(insert) - sectors_found));
1806                         SET_KEY_SIZE(insert, sectors_found);
1807                 }
1808         }
1809
1810         return false;
1811 }
1812
1813 static bool btree_insert_key(struct btree *b, struct btree_op *op,
1814                              struct bkey *k)
1815 {
1816         struct bset *i = b->sets[b->nsets].data;
1817         struct bkey *m, *prev;
1818         const char *status = "insert";
1819
1820         BUG_ON(bkey_cmp(k, &b->key) > 0);
1821         BUG_ON(b->level && !KEY_PTRS(k));
1822         BUG_ON(!b->level && !KEY_OFFSET(k));
1823
1824         if (!b->level) {
1825                 struct btree_iter iter;
1826                 struct bkey search = KEY(KEY_INODE(k), KEY_START(k), 0);
1827
1828                 /*
1829                  * bset_search() returns the first key that is strictly greater
1830                  * than the search key - but for back merging, we want to find
1831                  * the first key that is greater than or equal to KEY_START(k) -
1832                  * unless KEY_START(k) is 0.
1833                  */
1834                 if (KEY_OFFSET(&search))
1835                         SET_KEY_OFFSET(&search, KEY_OFFSET(&search) - 1);
1836
1837                 prev = NULL;
1838                 m = bch_btree_iter_init(b, &iter, &search);
1839
1840                 if (fix_overlapping_extents(b, k, &iter, op))
1841                         return false;
1842
1843                 while (m != end(i) &&
1844                        bkey_cmp(k, &START_KEY(m)) > 0)
1845                         prev = m, m = bkey_next(m);
1846
1847                 if (key_merging_disabled(b->c))
1848                         goto insert;
1849
1850                 /* prev is in the tree, if we merge we're done */
1851                 status = "back merging";
1852                 if (prev &&
1853                     bch_bkey_try_merge(b, prev, k))
1854                         goto merged;
1855
1856                 status = "overwrote front";
1857                 if (m != end(i) &&
1858                     KEY_PTRS(m) == KEY_PTRS(k) && !KEY_SIZE(m))
1859                         goto copy;
1860
1861                 status = "front merge";
1862                 if (m != end(i) &&
1863                     bch_bkey_try_merge(b, k, m))
1864                         goto copy;
1865         } else
1866                 m = bch_bset_search(b, &b->sets[b->nsets], k);
1867
1868 insert: shift_keys(b, m, k);
1869 copy:   bkey_copy(m, k);
1870 merged:
1871         bch_check_keys(b, "%s for %s at %s: %s", status,
1872                        op_type(op), pbtree(b), pkey(k));
1873         bch_check_key_order_msg(b, i, "%s for %s at %s: %s", status,
1874                                 op_type(op), pbtree(b), pkey(k));
1875
1876         if (b->level && !KEY_OFFSET(k))
1877                 b->prio_blocked++;
1878
1879         pr_debug("%s for %s at %s: %s", status,
1880                  op_type(op), pbtree(b), pkey(k));
1881
1882         return true;
1883 }
1884
1885 bool bch_btree_insert_keys(struct btree *b, struct btree_op *op)
1886 {
1887         bool ret = false;
1888         struct bkey *k;
1889         unsigned oldsize = bch_count_data(b);
1890
1891         while ((k = bch_keylist_pop(&op->keys))) {
1892                 bkey_put(b->c, k, b->level);
1893                 ret |= btree_insert_key(b, op, k);
1894         }
1895
1896         BUG_ON(bch_count_data(b) < oldsize);
1897         return ret;
1898 }
1899
1900 bool bch_btree_insert_check_key(struct btree *b, struct btree_op *op,
1901                                    struct bio *bio)
1902 {
1903         bool ret = false;
1904         uint64_t btree_ptr = b->key.ptr[0];
1905         unsigned long seq = b->seq;
1906         BKEY_PADDED(k) tmp;
1907
1908         rw_unlock(false, b);
1909         rw_lock(true, b, b->level);
1910
1911         if (b->key.ptr[0] != btree_ptr ||
1912             b->seq != seq + 1 ||
1913             should_split(b))
1914                 goto out;
1915
1916         op->replace = KEY(op->inode, bio_end(bio), bio_sectors(bio));
1917
1918         SET_KEY_PTRS(&op->replace, 1);
1919         get_random_bytes(&op->replace.ptr[0], sizeof(uint64_t));
1920
1921         SET_PTR_DEV(&op->replace, 0, PTR_CHECK_DEV);
1922
1923         bkey_copy(&tmp.k, &op->replace);
1924
1925         BUG_ON(op->type != BTREE_INSERT);
1926         BUG_ON(!btree_insert_key(b, op, &tmp.k));
1927         bch_btree_write(b, false, NULL);
1928         ret = true;
1929 out:
1930         downgrade_write(&b->lock);
1931         return ret;
1932 }
1933
1934 static int btree_split(struct btree *b, struct btree_op *op)
1935 {
1936         bool split, root = b == b->c->root;
1937         struct btree *n1, *n2 = NULL, *n3 = NULL;
1938         uint64_t start_time = local_clock();
1939
1940         if (b->level)
1941                 set_closure_blocking(&op->cl);
1942
1943         n1 = btree_node_alloc_replacement(b, &op->cl);
1944         if (IS_ERR(n1))
1945                 goto err;
1946
1947         split = set_blocks(n1->sets[0].data, n1->c) > (btree_blocks(b) * 4) / 5;
1948
1949         pr_debug("%ssplitting at %s keys %i", split ? "" : "not ",
1950                  pbtree(b), n1->sets[0].data->keys);
1951
1952         if (split) {
1953                 unsigned keys = 0;
1954
1955                 n2 = bch_btree_node_alloc(b->c, b->level, &op->cl);
1956                 if (IS_ERR(n2))
1957                         goto err_free1;
1958
1959                 if (root) {
1960                         n3 = bch_btree_node_alloc(b->c, b->level + 1, &op->cl);
1961                         if (IS_ERR(n3))
1962                                 goto err_free2;
1963                 }
1964
1965                 bch_btree_insert_keys(n1, op);
1966
1967                 /* Has to be a linear search because we don't have an auxiliary
1968                  * search tree yet
1969                  */
1970
1971                 while (keys < (n1->sets[0].data->keys * 3) / 5)
1972                         keys += bkey_u64s(node(n1->sets[0].data, keys));
1973
1974                 bkey_copy_key(&n1->key, node(n1->sets[0].data, keys));
1975                 keys += bkey_u64s(node(n1->sets[0].data, keys));
1976
1977                 n2->sets[0].data->keys = n1->sets[0].data->keys - keys;
1978                 n1->sets[0].data->keys = keys;
1979
1980                 memcpy(n2->sets[0].data->start,
1981                        end(n1->sets[0].data),
1982                        n2->sets[0].data->keys * sizeof(uint64_t));
1983
1984                 bkey_copy_key(&n2->key, &b->key);
1985
1986                 bch_keylist_add(&op->keys, &n2->key);
1987                 bch_btree_write(n2, true, op);
1988                 rw_unlock(true, n2);
1989         } else
1990                 bch_btree_insert_keys(n1, op);
1991
1992         bch_keylist_add(&op->keys, &n1->key);
1993         bch_btree_write(n1, true, op);
1994
1995         if (n3) {
1996                 bkey_copy_key(&n3->key, &MAX_KEY);
1997                 bch_btree_insert_keys(n3, op);
1998                 bch_btree_write(n3, true, op);
1999
2000                 closure_sync(&op->cl);
2001                 bch_btree_set_root(n3);
2002                 rw_unlock(true, n3);
2003         } else if (root) {
2004                 op->keys.top = op->keys.bottom;
2005                 closure_sync(&op->cl);
2006                 bch_btree_set_root(n1);
2007         } else {
2008                 unsigned i;
2009
2010                 bkey_copy(op->keys.top, &b->key);
2011                 bkey_copy_key(op->keys.top, &ZERO_KEY);
2012
2013                 for (i = 0; i < KEY_PTRS(&b->key); i++) {
2014                         uint8_t g = PTR_BUCKET(b->c, &b->key, i)->gen + 1;
2015
2016                         SET_PTR_GEN(op->keys.top, i, g);
2017                 }
2018
2019                 bch_keylist_push(&op->keys);
2020                 closure_sync(&op->cl);
2021                 atomic_inc(&b->c->prio_blocked);
2022         }
2023
2024         rw_unlock(true, n1);
2025         btree_node_free(b, op);
2026
2027         bch_time_stats_update(&b->c->btree_split_time, start_time);
2028
2029         return 0;
2030 err_free2:
2031         __bkey_put(n2->c, &n2->key);
2032         btree_node_free(n2, op);
2033         rw_unlock(true, n2);
2034 err_free1:
2035         __bkey_put(n1->c, &n1->key);
2036         btree_node_free(n1, op);
2037         rw_unlock(true, n1);
2038 err:
2039         if (n3 == ERR_PTR(-EAGAIN) ||
2040             n2 == ERR_PTR(-EAGAIN) ||
2041             n1 == ERR_PTR(-EAGAIN))
2042                 return -EAGAIN;
2043
2044         pr_warn("couldn't split");
2045         return -ENOMEM;
2046 }
2047
2048 static int bch_btree_insert_recurse(struct btree *b, struct btree_op *op,
2049                                     struct keylist *stack_keys)
2050 {
2051         if (b->level) {
2052                 int ret;
2053                 struct bkey *insert = op->keys.bottom;
2054                 struct bkey *k = bch_next_recurse_key(b, &START_KEY(insert));
2055
2056                 if (!k) {
2057                         btree_bug(b, "no key to recurse on at level %i/%i",
2058                                   b->level, b->c->root->level);
2059
2060                         op->keys.top = op->keys.bottom;
2061                         return -EIO;
2062                 }
2063
2064                 if (bkey_cmp(insert, k) > 0) {
2065                         unsigned i;
2066
2067                         if (op->type == BTREE_REPLACE) {
2068                                 __bkey_put(b->c, insert);
2069                                 op->keys.top = op->keys.bottom;
2070                                 op->insert_collision = true;
2071                                 return 0;
2072                         }
2073
2074                         for (i = 0; i < KEY_PTRS(insert); i++)
2075                                 atomic_inc(&PTR_BUCKET(b->c, insert, i)->pin);
2076
2077                         bkey_copy(stack_keys->top, insert);
2078
2079                         bch_cut_back(k, insert);
2080                         bch_cut_front(k, stack_keys->top);
2081
2082                         bch_keylist_push(stack_keys);
2083                 }
2084
2085                 ret = btree(insert_recurse, k, b, op, stack_keys);
2086                 if (ret)
2087                         return ret;
2088         }
2089
2090         if (!bch_keylist_empty(&op->keys)) {
2091                 if (should_split(b)) {
2092                         if (op->lock <= b->c->root->level) {
2093                                 BUG_ON(b->level);
2094                                 op->lock = b->c->root->level + 1;
2095                                 return -EINTR;
2096                         }
2097                         return btree_split(b, op);
2098                 }
2099
2100                 BUG_ON(write_block(b) != b->sets[b->nsets].data);
2101
2102                 if (bch_btree_insert_keys(b, op))
2103                         bch_btree_write(b, false, op);
2104         }
2105
2106         return 0;
2107 }
2108
2109 int bch_btree_insert(struct btree_op *op, struct cache_set *c)
2110 {
2111         int ret = 0;
2112         struct keylist stack_keys;
2113
2114         /*
2115          * Don't want to block with the btree locked unless we have to,
2116          * otherwise we get deadlocks with try_harder and between split/gc
2117          */
2118         clear_closure_blocking(&op->cl);
2119
2120         BUG_ON(bch_keylist_empty(&op->keys));
2121         bch_keylist_copy(&stack_keys, &op->keys);
2122         bch_keylist_init(&op->keys);
2123
2124         while (!bch_keylist_empty(&stack_keys) ||
2125                !bch_keylist_empty(&op->keys)) {
2126                 if (bch_keylist_empty(&op->keys)) {
2127                         bch_keylist_add(&op->keys,
2128                                         bch_keylist_pop(&stack_keys));
2129                         op->lock = 0;
2130                 }
2131
2132                 ret = btree_root(insert_recurse, c, op, &stack_keys);
2133
2134                 if (ret == -EAGAIN) {
2135                         ret = 0;
2136                         closure_sync(&op->cl);
2137                 } else if (ret) {
2138                         struct bkey *k;
2139
2140                         pr_err("error %i trying to insert key for %s",
2141                                ret, op_type(op));
2142
2143                         while ((k = bch_keylist_pop(&stack_keys) ?:
2144                                     bch_keylist_pop(&op->keys)))
2145                                 bkey_put(c, k, 0);
2146                 }
2147         }
2148
2149         bch_keylist_free(&stack_keys);
2150
2151         if (op->journal)
2152                 atomic_dec_bug(op->journal);
2153         op->journal = NULL;
2154         return ret;
2155 }
2156
2157 void bch_btree_set_root(struct btree *b)
2158 {
2159         unsigned i;
2160         struct closure cl;
2161
2162         closure_init_stack(&cl);
2163
2164         BUG_ON(!b->written);
2165
2166         for (i = 0; i < KEY_PTRS(&b->key); i++)
2167                 BUG_ON(PTR_BUCKET(b->c, &b->key, i)->prio != BTREE_PRIO);
2168
2169         mutex_lock(&b->c->bucket_lock);
2170         list_del_init(&b->list);
2171         mutex_unlock(&b->c->bucket_lock);
2172
2173         b->c->root = b;
2174         __bkey_put(b->c, &b->key);
2175
2176         bch_journal_meta(b->c, &cl);
2177         pr_debug("%s for %pf", pbtree(b), __builtin_return_address(0));
2178         closure_sync(&cl);
2179 }
2180
2181 /* Cache lookup */
2182
2183 static int submit_partial_cache_miss(struct btree *b, struct btree_op *op,
2184                                      struct bkey *k)
2185 {
2186         struct search *s = container_of(op, struct search, op);
2187         struct bio *bio = &s->bio.bio;
2188         int ret = 0;
2189
2190         while (!ret &&
2191                !op->lookup_done) {
2192                 unsigned sectors = INT_MAX;
2193
2194                 if (KEY_INODE(k) == op->inode) {
2195                         if (KEY_START(k) <= bio->bi_sector)
2196                                 break;
2197
2198                         sectors = min_t(uint64_t, sectors,
2199                                         KEY_START(k) - bio->bi_sector);
2200                 }
2201
2202                 ret = s->d->cache_miss(b, s, bio, sectors);
2203         }
2204
2205         return ret;
2206 }
2207
2208 /*
2209  * Read from a single key, handling the initial cache miss if the key starts in
2210  * the middle of the bio
2211  */
2212 static int submit_partial_cache_hit(struct btree *b, struct btree_op *op,
2213                                     struct bkey *k)
2214 {
2215         struct search *s = container_of(op, struct search, op);
2216         struct bio *bio = &s->bio.bio;
2217         unsigned ptr;
2218         struct bio *n;
2219
2220         int ret = submit_partial_cache_miss(b, op, k);
2221         if (ret || op->lookup_done)
2222                 return ret;
2223
2224         /* XXX: figure out best pointer - for multiple cache devices */
2225         ptr = 0;
2226
2227         PTR_BUCKET(b->c, k, ptr)->prio = INITIAL_PRIO;
2228
2229         while (!op->lookup_done &&
2230                KEY_INODE(k) == op->inode &&
2231                bio->bi_sector < KEY_OFFSET(k)) {
2232                 struct bkey *bio_key;
2233                 sector_t sector = PTR_OFFSET(k, ptr) +
2234                         (bio->bi_sector - KEY_START(k));
2235                 unsigned sectors = min_t(uint64_t, INT_MAX,
2236                                          KEY_OFFSET(k) - bio->bi_sector);
2237
2238                 n = bch_bio_split(bio, sectors, GFP_NOIO, s->d->bio_split);
2239                 if (!n)
2240                         return -EAGAIN;
2241
2242                 if (n == bio)
2243                         op->lookup_done = true;
2244
2245                 bio_key = &container_of(n, struct bbio, bio)->key;
2246
2247                 /*
2248                  * The bucket we're reading from might be reused while our bio
2249                  * is in flight, and we could then end up reading the wrong
2250                  * data.
2251                  *
2252                  * We guard against this by checking (in cache_read_endio()) if
2253                  * the pointer is stale again; if so, we treat it as an error
2254                  * and reread from the backing device (but we don't pass that
2255                  * error up anywhere).
2256                  */
2257
2258                 bch_bkey_copy_single_ptr(bio_key, k, ptr);
2259                 SET_PTR_OFFSET(bio_key, 0, sector);
2260
2261                 n->bi_end_io    = bch_cache_read_endio;
2262                 n->bi_private   = &s->cl;
2263
2264                 trace_bcache_cache_hit(n);
2265                 __bch_submit_bbio(n, b->c);
2266         }
2267
2268         return 0;
2269 }
2270
2271 int bch_btree_search_recurse(struct btree *b, struct btree_op *op)
2272 {
2273         struct search *s = container_of(op, struct search, op);
2274         struct bio *bio = &s->bio.bio;
2275
2276         int ret = 0;
2277         struct bkey *k;
2278         struct btree_iter iter;
2279         bch_btree_iter_init(b, &iter, &KEY(op->inode, bio->bi_sector, 0));
2280
2281         pr_debug("at %s searching for %u:%llu", pbtree(b), op->inode,
2282                  (uint64_t) bio->bi_sector);
2283
2284         do {
2285                 k = bch_btree_iter_next_filter(&iter, b, bch_ptr_bad);
2286                 if (!k) {
2287                         /*
2288                          * b->key would be exactly what we want, except that
2289                          * pointers to btree nodes have nonzero size - we
2290                          * wouldn't go far enough
2291                          */
2292
2293                         ret = submit_partial_cache_miss(b, op,
2294                                         &KEY(KEY_INODE(&b->key),
2295                                              KEY_OFFSET(&b->key), 0));
2296                         break;
2297                 }
2298
2299                 ret = b->level
2300                         ? btree(search_recurse, k, b, op)
2301                         : submit_partial_cache_hit(b, op, k);
2302         } while (!ret &&
2303                  !op->lookup_done);
2304
2305         return ret;
2306 }
2307
2308 /* Keybuf code */
2309
2310 static inline int keybuf_cmp(struct keybuf_key *l, struct keybuf_key *r)
2311 {
2312         /* Overlapping keys compare equal */
2313         if (bkey_cmp(&l->key, &START_KEY(&r->key)) <= 0)
2314                 return -1;
2315         if (bkey_cmp(&START_KEY(&l->key), &r->key) >= 0)
2316                 return 1;
2317         return 0;
2318 }
2319
2320 static inline int keybuf_nonoverlapping_cmp(struct keybuf_key *l,
2321                                             struct keybuf_key *r)
2322 {
2323         return clamp_t(int64_t, bkey_cmp(&l->key, &r->key), -1, 1);
2324 }
2325
2326 static int bch_btree_refill_keybuf(struct btree *b, struct btree_op *op,
2327                                    struct keybuf *buf, struct bkey *end)
2328 {
2329         struct btree_iter iter;
2330         bch_btree_iter_init(b, &iter, &buf->last_scanned);
2331
2332         while (!array_freelist_empty(&buf->freelist)) {
2333                 struct bkey *k = bch_btree_iter_next_filter(&iter, b,
2334                                                             bch_ptr_bad);
2335
2336                 if (!b->level) {
2337                         if (!k) {
2338                                 buf->last_scanned = b->key;
2339                                 break;
2340                         }
2341
2342                         buf->last_scanned = *k;
2343                         if (bkey_cmp(&buf->last_scanned, end) >= 0)
2344                                 break;
2345
2346                         if (buf->key_predicate(buf, k)) {
2347                                 struct keybuf_key *w;
2348
2349                                 pr_debug("%s", pkey(k));
2350
2351                                 spin_lock(&buf->lock);
2352
2353                                 w = array_alloc(&buf->freelist);
2354
2355                                 w->private = NULL;
2356                                 bkey_copy(&w->key, k);
2357
2358                                 if (RB_INSERT(&buf->keys, w, node, keybuf_cmp))
2359                                         array_free(&buf->freelist, w);
2360
2361                                 spin_unlock(&buf->lock);
2362                         }
2363                 } else {
2364                         if (!k)
2365                                 break;
2366
2367                         btree(refill_keybuf, k, b, op, buf, end);
2368                         /*
2369                          * Might get an error here, but can't really do anything
2370                          * and it'll get logged elsewhere. Just read what we
2371                          * can.
2372                          */
2373
2374                         if (bkey_cmp(&buf->last_scanned, end) >= 0)
2375                                 break;
2376
2377                         cond_resched();
2378                 }
2379         }
2380
2381         return 0;
2382 }
2383
2384 void bch_refill_keybuf(struct cache_set *c, struct keybuf *buf,
2385                           struct bkey *end)
2386 {
2387         struct bkey start = buf->last_scanned;
2388         struct btree_op op;
2389         bch_btree_op_init_stack(&op);
2390
2391         cond_resched();
2392
2393         btree_root(refill_keybuf, c, &op, buf, end);
2394         closure_sync(&op.cl);
2395
2396         pr_debug("found %s keys from %llu:%llu to %llu:%llu",
2397                  RB_EMPTY_ROOT(&buf->keys) ? "no" :
2398                  array_freelist_empty(&buf->freelist) ? "some" : "a few",
2399                  KEY_INODE(&start), KEY_OFFSET(&start),
2400                  KEY_INODE(&buf->last_scanned), KEY_OFFSET(&buf->last_scanned));
2401
2402         spin_lock(&buf->lock);
2403
2404         if (!RB_EMPTY_ROOT(&buf->keys)) {
2405                 struct keybuf_key *w;
2406                 w = RB_FIRST(&buf->keys, struct keybuf_key, node);
2407                 buf->start      = START_KEY(&w->key);
2408
2409                 w = RB_LAST(&buf->keys, struct keybuf_key, node);
2410                 buf->end        = w->key;
2411         } else {
2412                 buf->start      = MAX_KEY;
2413                 buf->end        = MAX_KEY;
2414         }
2415
2416         spin_unlock(&buf->lock);
2417 }
2418
2419 static void __bch_keybuf_del(struct keybuf *buf, struct keybuf_key *w)
2420 {
2421         rb_erase(&w->node, &buf->keys);
2422         array_free(&buf->freelist, w);
2423 }
2424
2425 void bch_keybuf_del(struct keybuf *buf, struct keybuf_key *w)
2426 {
2427         spin_lock(&buf->lock);
2428         __bch_keybuf_del(buf, w);
2429         spin_unlock(&buf->lock);
2430 }
2431
2432 bool bch_keybuf_check_overlapping(struct keybuf *buf, struct bkey *start,
2433                                   struct bkey *end)
2434 {
2435         bool ret = false;
2436         struct keybuf_key *p, *w, s;
2437         s.key = *start;
2438
2439         if (bkey_cmp(end, &buf->start) <= 0 ||
2440             bkey_cmp(start, &buf->end) >= 0)
2441                 return false;
2442
2443         spin_lock(&buf->lock);
2444         w = RB_GREATER(&buf->keys, s, node, keybuf_nonoverlapping_cmp);
2445
2446         while (w && bkey_cmp(&START_KEY(&w->key), end) < 0) {
2447                 p = w;
2448                 w = RB_NEXT(w, node);
2449
2450                 if (p->private)
2451                         ret = true;
2452                 else
2453                         __bch_keybuf_del(buf, p);
2454         }
2455
2456         spin_unlock(&buf->lock);
2457         return ret;
2458 }
2459
2460 struct keybuf_key *bch_keybuf_next(struct keybuf *buf)
2461 {
2462         struct keybuf_key *w;
2463         spin_lock(&buf->lock);
2464
2465         w = RB_FIRST(&buf->keys, struct keybuf_key, node);
2466
2467         while (w && w->private)
2468                 w = RB_NEXT(w, node);
2469
2470         if (w)
2471                 w->private = ERR_PTR(-EINTR);
2472
2473         spin_unlock(&buf->lock);
2474         return w;
2475 }
2476
2477 struct keybuf_key *bch_keybuf_next_rescan(struct cache_set *c,
2478                                              struct keybuf *buf,
2479                                              struct bkey *end)
2480 {
2481         struct keybuf_key *ret;
2482
2483         while (1) {
2484                 ret = bch_keybuf_next(buf);
2485                 if (ret)
2486                         break;
2487
2488                 if (bkey_cmp(&buf->last_scanned, end) >= 0) {
2489                         pr_debug("scan finished");
2490                         break;
2491                 }
2492
2493                 bch_refill_keybuf(c, buf, end);
2494         }
2495
2496         return ret;
2497 }
2498
2499 void bch_keybuf_init(struct keybuf *buf, keybuf_pred_fn *fn)
2500 {
2501         buf->key_predicate      = fn;
2502         buf->last_scanned       = MAX_KEY;
2503         buf->keys               = RB_ROOT;
2504
2505         spin_lock_init(&buf->lock);
2506         array_allocator_init(&buf->freelist);
2507 }
2508
2509 void bch_btree_exit(void)
2510 {
2511         if (btree_io_wq)
2512                 destroy_workqueue(btree_io_wq);
2513         if (bch_gc_wq)
2514                 destroy_workqueue(bch_gc_wq);
2515 }
2516
2517 int __init bch_btree_init(void)
2518 {
2519         if (!(bch_gc_wq = create_singlethread_workqueue("bch_btree_gc")) ||
2520             !(btree_io_wq = create_singlethread_workqueue("bch_btree_io")))
2521                 return -ENOMEM;
2522
2523         return 0;
2524 }