Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0
0002 /*
0003  * bcache journalling code, for btree insertions
0004  *
0005  * Copyright 2012 Google, Inc.
0006  */
0007 
0008 #include "bcache.h"
0009 #include "btree.h"
0010 #include "debug.h"
0011 #include "extents.h"
0012 
0013 #include <trace/events/bcache.h>
0014 
0015 /*
0016  * Journal replay/recovery:
0017  *
0018  * This code is all driven from run_cache_set(); we first read the journal
0019  * entries, do some other stuff, then we mark all the keys in the journal
0020  * entries (same as garbage collection would), then we replay them - reinserting
0021  * them into the cache in precisely the same order as they appear in the
0022  * journal.
0023  *
0024  * We only journal keys that go in leaf nodes, which simplifies things quite a
0025  * bit.
0026  */
0027 
0028 static void journal_read_endio(struct bio *bio)
0029 {
0030     struct closure *cl = bio->bi_private;
0031 
0032     closure_put(cl);
0033 }
0034 
0035 static int journal_read_bucket(struct cache *ca, struct list_head *list,
0036                    unsigned int bucket_index)
0037 {
0038     struct journal_device *ja = &ca->journal;
0039     struct bio *bio = &ja->bio;
0040 
0041     struct journal_replay *i;
0042     struct jset *j, *data = ca->set->journal.w[0].data;
0043     struct closure cl;
0044     unsigned int len, left, offset = 0;
0045     int ret = 0;
0046     sector_t bucket = bucket_to_sector(ca->set, ca->sb.d[bucket_index]);
0047 
0048     closure_init_stack(&cl);
0049 
0050     pr_debug("reading %u\n", bucket_index);
0051 
0052     while (offset < ca->sb.bucket_size) {
0053 reread:     left = ca->sb.bucket_size - offset;
0054         len = min_t(unsigned int, left, PAGE_SECTORS << JSET_BITS);
0055 
0056         bio_reset(bio, ca->bdev, REQ_OP_READ);
0057         bio->bi_iter.bi_sector  = bucket + offset;
0058         bio->bi_iter.bi_size    = len << 9;
0059 
0060         bio->bi_end_io  = journal_read_endio;
0061         bio->bi_private = &cl;
0062         bch_bio_map(bio, data);
0063 
0064         closure_bio_submit(ca->set, bio, &cl);
0065         closure_sync(&cl);
0066 
0067         /* This function could be simpler now since we no longer write
0068          * journal entries that overlap bucket boundaries; this means
0069          * the start of a bucket will always have a valid journal entry
0070          * if it has any journal entries at all.
0071          */
0072 
0073         j = data;
0074         while (len) {
0075             struct list_head *where;
0076             size_t blocks, bytes = set_bytes(j);
0077 
0078             if (j->magic != jset_magic(&ca->sb)) {
0079                 pr_debug("%u: bad magic\n", bucket_index);
0080                 return ret;
0081             }
0082 
0083             if (bytes > left << 9 ||
0084                 bytes > PAGE_SIZE << JSET_BITS) {
0085                 pr_info("%u: too big, %zu bytes, offset %u\n",
0086                     bucket_index, bytes, offset);
0087                 return ret;
0088             }
0089 
0090             if (bytes > len << 9)
0091                 goto reread;
0092 
0093             if (j->csum != csum_set(j)) {
0094                 pr_info("%u: bad csum, %zu bytes, offset %u\n",
0095                     bucket_index, bytes, offset);
0096                 return ret;
0097             }
0098 
0099             blocks = set_blocks(j, block_bytes(ca));
0100 
0101             /*
0102              * Nodes in 'list' are in linear increasing order of
0103              * i->j.seq, the node on head has the smallest (oldest)
0104              * journal seq, the node on tail has the biggest
0105              * (latest) journal seq.
0106              */
0107 
0108             /*
0109              * Check from the oldest jset for last_seq. If
0110              * i->j.seq < j->last_seq, it means the oldest jset
0111              * in list is expired and useless, remove it from
0112              * this list. Otherwise, j is a candidate jset for
0113              * further following checks.
0114              */
0115             while (!list_empty(list)) {
0116                 i = list_first_entry(list,
0117                     struct journal_replay, list);
0118                 if (i->j.seq >= j->last_seq)
0119                     break;
0120                 list_del(&i->list);
0121                 kfree(i);
0122             }
0123 
0124             /* iterate list in reverse order (from latest jset) */
0125             list_for_each_entry_reverse(i, list, list) {
0126                 if (j->seq == i->j.seq)
0127                     goto next_set;
0128 
0129                 /*
0130                  * if j->seq is less than any i->j.last_seq
0131                  * in list, j is an expired and useless jset.
0132                  */
0133                 if (j->seq < i->j.last_seq)
0134                     goto next_set;
0135 
0136                 /*
0137                  * 'where' points to first jset in list which
0138                  * is elder then j.
0139                  */
0140                 if (j->seq > i->j.seq) {
0141                     where = &i->list;
0142                     goto add;
0143                 }
0144             }
0145 
0146             where = list;
0147 add:
0148             i = kmalloc(offsetof(struct journal_replay, j) +
0149                     bytes, GFP_KERNEL);
0150             if (!i)
0151                 return -ENOMEM;
0152             memcpy(&i->j, j, bytes);
0153             /* Add to the location after 'where' points to */
0154             list_add(&i->list, where);
0155             ret = 1;
0156 
0157             if (j->seq > ja->seq[bucket_index])
0158                 ja->seq[bucket_index] = j->seq;
0159 next_set:
0160             offset  += blocks * ca->sb.block_size;
0161             len -= blocks * ca->sb.block_size;
0162             j = ((void *) j) + blocks * block_bytes(ca);
0163         }
0164     }
0165 
0166     return ret;
0167 }
0168 
0169 int bch_journal_read(struct cache_set *c, struct list_head *list)
0170 {
0171 #define read_bucket(b)                          \
0172     ({                              \
0173         ret = journal_read_bucket(ca, list, b);         \
0174         __set_bit(b, bitmap);                   \
0175         if (ret < 0)                        \
0176             return ret;                 \
0177         ret;                            \
0178     })
0179 
0180     struct cache *ca = c->cache;
0181     int ret = 0;
0182     struct journal_device *ja = &ca->journal;
0183     DECLARE_BITMAP(bitmap, SB_JOURNAL_BUCKETS);
0184     unsigned int i, l, r, m;
0185     uint64_t seq;
0186 
0187     bitmap_zero(bitmap, SB_JOURNAL_BUCKETS);
0188     pr_debug("%u journal buckets\n", ca->sb.njournal_buckets);
0189 
0190     /*
0191      * Read journal buckets ordered by golden ratio hash to quickly
0192      * find a sequence of buckets with valid journal entries
0193      */
0194     for (i = 0; i < ca->sb.njournal_buckets; i++) {
0195         /*
0196          * We must try the index l with ZERO first for
0197          * correctness due to the scenario that the journal
0198          * bucket is circular buffer which might have wrapped
0199          */
0200         l = (i * 2654435769U) % ca->sb.njournal_buckets;
0201 
0202         if (test_bit(l, bitmap))
0203             break;
0204 
0205         if (read_bucket(l))
0206             goto bsearch;
0207     }
0208 
0209     /*
0210      * If that fails, check all the buckets we haven't checked
0211      * already
0212      */
0213     pr_debug("falling back to linear search\n");
0214 
0215     for_each_clear_bit(l, bitmap, ca->sb.njournal_buckets)
0216         if (read_bucket(l))
0217             goto bsearch;
0218 
0219     /* no journal entries on this device? */
0220     if (l == ca->sb.njournal_buckets)
0221         goto out;
0222 bsearch:
0223     BUG_ON(list_empty(list));
0224 
0225     /* Binary search */
0226     m = l;
0227     r = find_next_bit(bitmap, ca->sb.njournal_buckets, l + 1);
0228     pr_debug("starting binary search, l %u r %u\n", l, r);
0229 
0230     while (l + 1 < r) {
0231         seq = list_entry(list->prev, struct journal_replay,
0232                  list)->j.seq;
0233 
0234         m = (l + r) >> 1;
0235         read_bucket(m);
0236 
0237         if (seq != list_entry(list->prev, struct journal_replay,
0238                       list)->j.seq)
0239             l = m;
0240         else
0241             r = m;
0242     }
0243 
0244     /*
0245      * Read buckets in reverse order until we stop finding more
0246      * journal entries
0247      */
0248     pr_debug("finishing up: m %u njournal_buckets %u\n",
0249          m, ca->sb.njournal_buckets);
0250     l = m;
0251 
0252     while (1) {
0253         if (!l--)
0254             l = ca->sb.njournal_buckets - 1;
0255 
0256         if (l == m)
0257             break;
0258 
0259         if (test_bit(l, bitmap))
0260             continue;
0261 
0262         if (!read_bucket(l))
0263             break;
0264     }
0265 
0266     seq = 0;
0267 
0268     for (i = 0; i < ca->sb.njournal_buckets; i++)
0269         if (ja->seq[i] > seq) {
0270             seq = ja->seq[i];
0271             /*
0272              * When journal_reclaim() goes to allocate for
0273              * the first time, it'll use the bucket after
0274              * ja->cur_idx
0275              */
0276             ja->cur_idx = i;
0277             ja->last_idx = ja->discard_idx = (i + 1) %
0278                 ca->sb.njournal_buckets;
0279 
0280         }
0281 
0282 out:
0283     if (!list_empty(list))
0284         c->journal.seq = list_entry(list->prev,
0285                         struct journal_replay,
0286                         list)->j.seq;
0287 
0288     return 0;
0289 #undef read_bucket
0290 }
0291 
0292 void bch_journal_mark(struct cache_set *c, struct list_head *list)
0293 {
0294     atomic_t p = { 0 };
0295     struct bkey *k;
0296     struct journal_replay *i;
0297     struct journal *j = &c->journal;
0298     uint64_t last = j->seq;
0299 
0300     /*
0301      * journal.pin should never fill up - we never write a journal
0302      * entry when it would fill up. But if for some reason it does, we
0303      * iterate over the list in reverse order so that we can just skip that
0304      * refcount instead of bugging.
0305      */
0306 
0307     list_for_each_entry_reverse(i, list, list) {
0308         BUG_ON(last < i->j.seq);
0309         i->pin = NULL;
0310 
0311         while (last-- != i->j.seq)
0312             if (fifo_free(&j->pin) > 1) {
0313                 fifo_push_front(&j->pin, p);
0314                 atomic_set(&fifo_front(&j->pin), 0);
0315             }
0316 
0317         if (fifo_free(&j->pin) > 1) {
0318             fifo_push_front(&j->pin, p);
0319             i->pin = &fifo_front(&j->pin);
0320             atomic_set(i->pin, 1);
0321         }
0322 
0323         for (k = i->j.start;
0324              k < bset_bkey_last(&i->j);
0325              k = bkey_next(k))
0326             if (!__bch_extent_invalid(c, k)) {
0327                 unsigned int j;
0328 
0329                 for (j = 0; j < KEY_PTRS(k); j++)
0330                     if (ptr_available(c, k, j))
0331                         atomic_inc(&PTR_BUCKET(c, k, j)->pin);
0332 
0333                 bch_initial_mark_key(c, 0, k);
0334             }
0335     }
0336 }
0337 
0338 static bool is_discard_enabled(struct cache_set *s)
0339 {
0340     struct cache *ca = s->cache;
0341 
0342     if (ca->discard)
0343         return true;
0344 
0345     return false;
0346 }
0347 
0348 int bch_journal_replay(struct cache_set *s, struct list_head *list)
0349 {
0350     int ret = 0, keys = 0, entries = 0;
0351     struct bkey *k;
0352     struct journal_replay *i =
0353         list_entry(list->prev, struct journal_replay, list);
0354 
0355     uint64_t start = i->j.last_seq, end = i->j.seq, n = start;
0356     struct keylist keylist;
0357 
0358     list_for_each_entry(i, list, list) {
0359         BUG_ON(i->pin && atomic_read(i->pin) != 1);
0360 
0361         if (n != i->j.seq) {
0362             if (n == start && is_discard_enabled(s))
0363                 pr_info("journal entries %llu-%llu may be discarded! (replaying %llu-%llu)\n",
0364                     n, i->j.seq - 1, start, end);
0365             else {
0366                 pr_err("journal entries %llu-%llu missing! (replaying %llu-%llu)\n",
0367                     n, i->j.seq - 1, start, end);
0368                 ret = -EIO;
0369                 goto err;
0370             }
0371         }
0372 
0373         for (k = i->j.start;
0374              k < bset_bkey_last(&i->j);
0375              k = bkey_next(k)) {
0376             trace_bcache_journal_replay_key(k);
0377 
0378             bch_keylist_init_single(&keylist, k);
0379 
0380             ret = bch_btree_insert(s, &keylist, i->pin, NULL);
0381             if (ret)
0382                 goto err;
0383 
0384             BUG_ON(!bch_keylist_empty(&keylist));
0385             keys++;
0386 
0387             cond_resched();
0388         }
0389 
0390         if (i->pin)
0391             atomic_dec(i->pin);
0392         n = i->j.seq + 1;
0393         entries++;
0394     }
0395 
0396     pr_info("journal replay done, %i keys in %i entries, seq %llu\n",
0397         keys, entries, end);
0398 err:
0399     while (!list_empty(list)) {
0400         i = list_first_entry(list, struct journal_replay, list);
0401         list_del(&i->list);
0402         kfree(i);
0403     }
0404 
0405     return ret;
0406 }
0407 
0408 void bch_journal_space_reserve(struct journal *j)
0409 {
0410     j->do_reserve = true;
0411 }
0412 
0413 /* Journalling */
0414 
0415 static void btree_flush_write(struct cache_set *c)
0416 {
0417     struct btree *b, *t, *btree_nodes[BTREE_FLUSH_NR];
0418     unsigned int i, nr;
0419     int ref_nr;
0420     atomic_t *fifo_front_p, *now_fifo_front_p;
0421     size_t mask;
0422 
0423     if (c->journal.btree_flushing)
0424         return;
0425 
0426     spin_lock(&c->journal.flush_write_lock);
0427     if (c->journal.btree_flushing) {
0428         spin_unlock(&c->journal.flush_write_lock);
0429         return;
0430     }
0431     c->journal.btree_flushing = true;
0432     spin_unlock(&c->journal.flush_write_lock);
0433 
0434     /* get the oldest journal entry and check its refcount */
0435     spin_lock(&c->journal.lock);
0436     fifo_front_p = &fifo_front(&c->journal.pin);
0437     ref_nr = atomic_read(fifo_front_p);
0438     if (ref_nr <= 0) {
0439         /*
0440          * do nothing if no btree node references
0441          * the oldest journal entry
0442          */
0443         spin_unlock(&c->journal.lock);
0444         goto out;
0445     }
0446     spin_unlock(&c->journal.lock);
0447 
0448     mask = c->journal.pin.mask;
0449     nr = 0;
0450     atomic_long_inc(&c->flush_write);
0451     memset(btree_nodes, 0, sizeof(btree_nodes));
0452 
0453     mutex_lock(&c->bucket_lock);
0454     list_for_each_entry_safe_reverse(b, t, &c->btree_cache, list) {
0455         /*
0456          * It is safe to get now_fifo_front_p without holding
0457          * c->journal.lock here, because we don't need to know
0458          * the exactly accurate value, just check whether the
0459          * front pointer of c->journal.pin is changed.
0460          */
0461         now_fifo_front_p = &fifo_front(&c->journal.pin);
0462         /*
0463          * If the oldest journal entry is reclaimed and front
0464          * pointer of c->journal.pin changes, it is unnecessary
0465          * to scan c->btree_cache anymore, just quit the loop and
0466          * flush out what we have already.
0467          */
0468         if (now_fifo_front_p != fifo_front_p)
0469             break;
0470         /*
0471          * quit this loop if all matching btree nodes are
0472          * scanned and record in btree_nodes[] already.
0473          */
0474         ref_nr = atomic_read(fifo_front_p);
0475         if (nr >= ref_nr)
0476             break;
0477 
0478         if (btree_node_journal_flush(b))
0479             pr_err("BUG: flush_write bit should not be set here!\n");
0480 
0481         mutex_lock(&b->write_lock);
0482 
0483         if (!btree_node_dirty(b)) {
0484             mutex_unlock(&b->write_lock);
0485             continue;
0486         }
0487 
0488         if (!btree_current_write(b)->journal) {
0489             mutex_unlock(&b->write_lock);
0490             continue;
0491         }
0492 
0493         /*
0494          * Only select the btree node which exactly references
0495          * the oldest journal entry.
0496          *
0497          * If the journal entry pointed by fifo_front_p is
0498          * reclaimed in parallel, don't worry:
0499          * - the list_for_each_xxx loop will quit when checking
0500          *   next now_fifo_front_p.
0501          * - If there are matched nodes recorded in btree_nodes[],
0502          *   they are clean now (this is why and how the oldest
0503          *   journal entry can be reclaimed). These selected nodes
0504          *   will be ignored and skipped in the following for-loop.
0505          */
0506         if (((btree_current_write(b)->journal - fifo_front_p) &
0507              mask) != 0) {
0508             mutex_unlock(&b->write_lock);
0509             continue;
0510         }
0511 
0512         set_btree_node_journal_flush(b);
0513 
0514         mutex_unlock(&b->write_lock);
0515 
0516         btree_nodes[nr++] = b;
0517         /*
0518          * To avoid holding c->bucket_lock too long time,
0519          * only scan for BTREE_FLUSH_NR matched btree nodes
0520          * at most. If there are more btree nodes reference
0521          * the oldest journal entry, try to flush them next
0522          * time when btree_flush_write() is called.
0523          */
0524         if (nr == BTREE_FLUSH_NR)
0525             break;
0526     }
0527     mutex_unlock(&c->bucket_lock);
0528 
0529     for (i = 0; i < nr; i++) {
0530         b = btree_nodes[i];
0531         if (!b) {
0532             pr_err("BUG: btree_nodes[%d] is NULL\n", i);
0533             continue;
0534         }
0535 
0536         /* safe to check without holding b->write_lock */
0537         if (!btree_node_journal_flush(b)) {
0538             pr_err("BUG: bnode %p: journal_flush bit cleaned\n", b);
0539             continue;
0540         }
0541 
0542         mutex_lock(&b->write_lock);
0543         if (!btree_current_write(b)->journal) {
0544             clear_bit(BTREE_NODE_journal_flush, &b->flags);
0545             mutex_unlock(&b->write_lock);
0546             pr_debug("bnode %p: written by others\n", b);
0547             continue;
0548         }
0549 
0550         if (!btree_node_dirty(b)) {
0551             clear_bit(BTREE_NODE_journal_flush, &b->flags);
0552             mutex_unlock(&b->write_lock);
0553             pr_debug("bnode %p: dirty bit cleaned by others\n", b);
0554             continue;
0555         }
0556 
0557         __bch_btree_node_write(b, NULL);
0558         clear_bit(BTREE_NODE_journal_flush, &b->flags);
0559         mutex_unlock(&b->write_lock);
0560     }
0561 
0562 out:
0563     spin_lock(&c->journal.flush_write_lock);
0564     c->journal.btree_flushing = false;
0565     spin_unlock(&c->journal.flush_write_lock);
0566 }
0567 
0568 #define last_seq(j) ((j)->seq - fifo_used(&(j)->pin) + 1)
0569 
0570 static void journal_discard_endio(struct bio *bio)
0571 {
0572     struct journal_device *ja =
0573         container_of(bio, struct journal_device, discard_bio);
0574     struct cache *ca = container_of(ja, struct cache, journal);
0575 
0576     atomic_set(&ja->discard_in_flight, DISCARD_DONE);
0577 
0578     closure_wake_up(&ca->set->journal.wait);
0579     closure_put(&ca->set->cl);
0580 }
0581 
0582 static void journal_discard_work(struct work_struct *work)
0583 {
0584     struct journal_device *ja =
0585         container_of(work, struct journal_device, discard_work);
0586 
0587     submit_bio(&ja->discard_bio);
0588 }
0589 
0590 static void do_journal_discard(struct cache *ca)
0591 {
0592     struct journal_device *ja = &ca->journal;
0593     struct bio *bio = &ja->discard_bio;
0594 
0595     if (!ca->discard) {
0596         ja->discard_idx = ja->last_idx;
0597         return;
0598     }
0599 
0600     switch (atomic_read(&ja->discard_in_flight)) {
0601     case DISCARD_IN_FLIGHT:
0602         return;
0603 
0604     case DISCARD_DONE:
0605         ja->discard_idx = (ja->discard_idx + 1) %
0606             ca->sb.njournal_buckets;
0607 
0608         atomic_set(&ja->discard_in_flight, DISCARD_READY);
0609         fallthrough;
0610 
0611     case DISCARD_READY:
0612         if (ja->discard_idx == ja->last_idx)
0613             return;
0614 
0615         atomic_set(&ja->discard_in_flight, DISCARD_IN_FLIGHT);
0616 
0617         bio_init(bio, ca->bdev, bio->bi_inline_vecs, 1, REQ_OP_DISCARD);
0618         bio->bi_iter.bi_sector  = bucket_to_sector(ca->set,
0619                         ca->sb.d[ja->discard_idx]);
0620         bio->bi_iter.bi_size    = bucket_bytes(ca);
0621         bio->bi_end_io      = journal_discard_endio;
0622 
0623         closure_get(&ca->set->cl);
0624         INIT_WORK(&ja->discard_work, journal_discard_work);
0625         queue_work(bch_journal_wq, &ja->discard_work);
0626     }
0627 }
0628 
0629 static unsigned int free_journal_buckets(struct cache_set *c)
0630 {
0631     struct journal *j = &c->journal;
0632     struct cache *ca = c->cache;
0633     struct journal_device *ja = &c->cache->journal;
0634     unsigned int n;
0635 
0636     /* In case njournal_buckets is not power of 2 */
0637     if (ja->cur_idx >= ja->discard_idx)
0638         n = ca->sb.njournal_buckets +  ja->discard_idx - ja->cur_idx;
0639     else
0640         n = ja->discard_idx - ja->cur_idx;
0641 
0642     if (n > (1 + j->do_reserve))
0643         return n - (1 + j->do_reserve);
0644 
0645     return 0;
0646 }
0647 
0648 static void journal_reclaim(struct cache_set *c)
0649 {
0650     struct bkey *k = &c->journal.key;
0651     struct cache *ca = c->cache;
0652     uint64_t last_seq;
0653     struct journal_device *ja = &ca->journal;
0654     atomic_t p __maybe_unused;
0655 
0656     atomic_long_inc(&c->reclaim);
0657 
0658     while (!atomic_read(&fifo_front(&c->journal.pin)))
0659         fifo_pop(&c->journal.pin, p);
0660 
0661     last_seq = last_seq(&c->journal);
0662 
0663     /* Update last_idx */
0664 
0665     while (ja->last_idx != ja->cur_idx &&
0666            ja->seq[ja->last_idx] < last_seq)
0667         ja->last_idx = (ja->last_idx + 1) %
0668             ca->sb.njournal_buckets;
0669 
0670     do_journal_discard(ca);
0671 
0672     if (c->journal.blocks_free)
0673         goto out;
0674 
0675     if (!free_journal_buckets(c))
0676         goto out;
0677 
0678     ja->cur_idx = (ja->cur_idx + 1) % ca->sb.njournal_buckets;
0679     k->ptr[0] = MAKE_PTR(0,
0680                  bucket_to_sector(c, ca->sb.d[ja->cur_idx]),
0681                  ca->sb.nr_this_dev);
0682     atomic_long_inc(&c->reclaimed_journal_buckets);
0683 
0684     bkey_init(k);
0685     SET_KEY_PTRS(k, 1);
0686     c->journal.blocks_free = ca->sb.bucket_size >> c->block_bits;
0687 
0688 out:
0689     if (!journal_full(&c->journal))
0690         __closure_wake_up(&c->journal.wait);
0691 }
0692 
0693 void bch_journal_next(struct journal *j)
0694 {
0695     atomic_t p = { 1 };
0696 
0697     j->cur = (j->cur == j->w)
0698         ? &j->w[1]
0699         : &j->w[0];
0700 
0701     /*
0702      * The fifo_push() needs to happen at the same time as j->seq is
0703      * incremented for last_seq() to be calculated correctly
0704      */
0705     BUG_ON(!fifo_push(&j->pin, p));
0706     atomic_set(&fifo_back(&j->pin), 1);
0707 
0708     j->cur->data->seq   = ++j->seq;
0709     j->cur->dirty       = false;
0710     j->cur->need_write  = false;
0711     j->cur->data->keys  = 0;
0712 
0713     if (fifo_full(&j->pin))
0714         pr_debug("journal_pin full (%zu)\n", fifo_used(&j->pin));
0715 }
0716 
0717 static void journal_write_endio(struct bio *bio)
0718 {
0719     struct journal_write *w = bio->bi_private;
0720 
0721     cache_set_err_on(bio->bi_status, w->c, "journal io error");
0722     closure_put(&w->c->journal.io);
0723 }
0724 
0725 static void journal_write(struct closure *cl);
0726 
0727 static void journal_write_done(struct closure *cl)
0728 {
0729     struct journal *j = container_of(cl, struct journal, io);
0730     struct journal_write *w = (j->cur == j->w)
0731         ? &j->w[1]
0732         : &j->w[0];
0733 
0734     __closure_wake_up(&w->wait);
0735     continue_at_nobarrier(cl, journal_write, bch_journal_wq);
0736 }
0737 
0738 static void journal_write_unlock(struct closure *cl)
0739     __releases(&c->journal.lock)
0740 {
0741     struct cache_set *c = container_of(cl, struct cache_set, journal.io);
0742 
0743     c->journal.io_in_flight = 0;
0744     spin_unlock(&c->journal.lock);
0745 }
0746 
0747 static void journal_write_unlocked(struct closure *cl)
0748     __releases(c->journal.lock)
0749 {
0750     struct cache_set *c = container_of(cl, struct cache_set, journal.io);
0751     struct cache *ca = c->cache;
0752     struct journal_write *w = c->journal.cur;
0753     struct bkey *k = &c->journal.key;
0754     unsigned int i, sectors = set_blocks(w->data, block_bytes(ca)) *
0755         ca->sb.block_size;
0756 
0757     struct bio *bio;
0758     struct bio_list list;
0759 
0760     bio_list_init(&list);
0761 
0762     if (!w->need_write) {
0763         closure_return_with_destructor(cl, journal_write_unlock);
0764         return;
0765     } else if (journal_full(&c->journal)) {
0766         journal_reclaim(c);
0767         spin_unlock(&c->journal.lock);
0768 
0769         btree_flush_write(c);
0770         continue_at(cl, journal_write, bch_journal_wq);
0771         return;
0772     }
0773 
0774     c->journal.blocks_free -= set_blocks(w->data, block_bytes(ca));
0775 
0776     w->data->btree_level = c->root->level;
0777 
0778     bkey_copy(&w->data->btree_root, &c->root->key);
0779     bkey_copy(&w->data->uuid_bucket, &c->uuid_bucket);
0780 
0781     w->data->prio_bucket[ca->sb.nr_this_dev] = ca->prio_buckets[0];
0782     w->data->magic      = jset_magic(&ca->sb);
0783     w->data->version    = BCACHE_JSET_VERSION;
0784     w->data->last_seq   = last_seq(&c->journal);
0785     w->data->csum       = csum_set(w->data);
0786 
0787     for (i = 0; i < KEY_PTRS(k); i++) {
0788         ca = c->cache;
0789         bio = &ca->journal.bio;
0790 
0791         atomic_long_add(sectors, &ca->meta_sectors_written);
0792 
0793         bio_reset(bio, ca->bdev, REQ_OP_WRITE | 
0794               REQ_SYNC | REQ_META | REQ_PREFLUSH | REQ_FUA);
0795         bio->bi_iter.bi_sector  = PTR_OFFSET(k, i);
0796         bio->bi_iter.bi_size = sectors << 9;
0797 
0798         bio->bi_end_io  = journal_write_endio;
0799         bio->bi_private = w;
0800         bch_bio_map(bio, w->data);
0801 
0802         trace_bcache_journal_write(bio, w->data->keys);
0803         bio_list_add(&list, bio);
0804 
0805         SET_PTR_OFFSET(k, i, PTR_OFFSET(k, i) + sectors);
0806 
0807         ca->journal.seq[ca->journal.cur_idx] = w->data->seq;
0808     }
0809 
0810     /* If KEY_PTRS(k) == 0, this jset gets lost in air */
0811     BUG_ON(i == 0);
0812 
0813     atomic_dec_bug(&fifo_back(&c->journal.pin));
0814     bch_journal_next(&c->journal);
0815     journal_reclaim(c);
0816 
0817     spin_unlock(&c->journal.lock);
0818 
0819     while ((bio = bio_list_pop(&list)))
0820         closure_bio_submit(c, bio, cl);
0821 
0822     continue_at(cl, journal_write_done, NULL);
0823 }
0824 
0825 static void journal_write(struct closure *cl)
0826 {
0827     struct cache_set *c = container_of(cl, struct cache_set, journal.io);
0828 
0829     spin_lock(&c->journal.lock);
0830     journal_write_unlocked(cl);
0831 }
0832 
0833 static void journal_try_write(struct cache_set *c)
0834     __releases(c->journal.lock)
0835 {
0836     struct closure *cl = &c->journal.io;
0837     struct journal_write *w = c->journal.cur;
0838 
0839     w->need_write = true;
0840 
0841     if (!c->journal.io_in_flight) {
0842         c->journal.io_in_flight = 1;
0843         closure_call(cl, journal_write_unlocked, NULL, &c->cl);
0844     } else {
0845         spin_unlock(&c->journal.lock);
0846     }
0847 }
0848 
0849 static struct journal_write *journal_wait_for_write(struct cache_set *c,
0850                             unsigned int nkeys)
0851     __acquires(&c->journal.lock)
0852 {
0853     size_t sectors;
0854     struct closure cl;
0855     bool wait = false;
0856     struct cache *ca = c->cache;
0857 
0858     closure_init_stack(&cl);
0859 
0860     spin_lock(&c->journal.lock);
0861 
0862     while (1) {
0863         struct journal_write *w = c->journal.cur;
0864 
0865         sectors = __set_blocks(w->data, w->data->keys + nkeys,
0866                        block_bytes(ca)) * ca->sb.block_size;
0867 
0868         if (sectors <= min_t(size_t,
0869                      c->journal.blocks_free * ca->sb.block_size,
0870                      PAGE_SECTORS << JSET_BITS))
0871             return w;
0872 
0873         if (wait)
0874             closure_wait(&c->journal.wait, &cl);
0875 
0876         if (!journal_full(&c->journal)) {
0877             if (wait)
0878                 trace_bcache_journal_entry_full(c);
0879 
0880             /*
0881              * XXX: If we were inserting so many keys that they
0882              * won't fit in an _empty_ journal write, we'll
0883              * deadlock. For now, handle this in
0884              * bch_keylist_realloc() - but something to think about.
0885              */
0886             BUG_ON(!w->data->keys);
0887 
0888             journal_try_write(c); /* unlocks */
0889         } else {
0890             if (wait)
0891                 trace_bcache_journal_full(c);
0892 
0893             journal_reclaim(c);
0894             spin_unlock(&c->journal.lock);
0895 
0896             btree_flush_write(c);
0897         }
0898 
0899         closure_sync(&cl);
0900         spin_lock(&c->journal.lock);
0901         wait = true;
0902     }
0903 }
0904 
0905 static void journal_write_work(struct work_struct *work)
0906 {
0907     struct cache_set *c = container_of(to_delayed_work(work),
0908                        struct cache_set,
0909                        journal.work);
0910     spin_lock(&c->journal.lock);
0911     if (c->journal.cur->dirty)
0912         journal_try_write(c);
0913     else
0914         spin_unlock(&c->journal.lock);
0915 }
0916 
0917 /*
0918  * Entry point to the journalling code - bio_insert() and btree_invalidate()
0919  * pass bch_journal() a list of keys to be journalled, and then
0920  * bch_journal() hands those same keys off to btree_insert_async()
0921  */
0922 
0923 atomic_t *bch_journal(struct cache_set *c,
0924               struct keylist *keys,
0925               struct closure *parent)
0926 {
0927     struct journal_write *w;
0928     atomic_t *ret;
0929 
0930     /* No journaling if CACHE_SET_IO_DISABLE set already */
0931     if (unlikely(test_bit(CACHE_SET_IO_DISABLE, &c->flags)))
0932         return NULL;
0933 
0934     if (!CACHE_SYNC(&c->cache->sb))
0935         return NULL;
0936 
0937     w = journal_wait_for_write(c, bch_keylist_nkeys(keys));
0938 
0939     memcpy(bset_bkey_last(w->data), keys->keys, bch_keylist_bytes(keys));
0940     w->data->keys += bch_keylist_nkeys(keys);
0941 
0942     ret = &fifo_back(&c->journal.pin);
0943     atomic_inc(ret);
0944 
0945     if (parent) {
0946         closure_wait(&w->wait, parent);
0947         journal_try_write(c);
0948     } else if (!w->dirty) {
0949         w->dirty = true;
0950         queue_delayed_work(bch_flush_wq, &c->journal.work,
0951                    msecs_to_jiffies(c->journal_delay_ms));
0952         spin_unlock(&c->journal.lock);
0953     } else {
0954         spin_unlock(&c->journal.lock);
0955     }
0956 
0957 
0958     return ret;
0959 }
0960 
0961 void bch_journal_meta(struct cache_set *c, struct closure *cl)
0962 {
0963     struct keylist keys;
0964     atomic_t *ref;
0965 
0966     bch_keylist_init(&keys);
0967 
0968     ref = bch_journal(c, &keys, cl);
0969     if (ref)
0970         atomic_dec_bug(ref);
0971 }
0972 
0973 void bch_journal_free(struct cache_set *c)
0974 {
0975     free_pages((unsigned long) c->journal.w[1].data, JSET_BITS);
0976     free_pages((unsigned long) c->journal.w[0].data, JSET_BITS);
0977     free_fifo(&c->journal.pin);
0978 }
0979 
0980 int bch_journal_alloc(struct cache_set *c)
0981 {
0982     struct journal *j = &c->journal;
0983 
0984     spin_lock_init(&j->lock);
0985     spin_lock_init(&j->flush_write_lock);
0986     INIT_DELAYED_WORK(&j->work, journal_write_work);
0987 
0988     c->journal_delay_ms = 100;
0989 
0990     j->w[0].c = c;
0991     j->w[1].c = c;
0992 
0993     if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
0994         !(j->w[0].data = (void *) __get_free_pages(GFP_KERNEL|__GFP_COMP, JSET_BITS)) ||
0995         !(j->w[1].data = (void *) __get_free_pages(GFP_KERNEL|__GFP_COMP, JSET_BITS)))
0996         return -ENOMEM;
0997 
0998     return 0;
0999 }