0001
0002 #include <linux/ceph/ceph_debug.h>
0003
0004 #include <linux/sort.h>
0005 #include <linux/slab.h>
0006 #include <linux/iversion.h>
0007 #include "super.h"
0008 #include "mds_client.h"
0009 #include <linux/ceph/decode.h>
0010
0011
0012 #define CEPH_SNAPID_MAP_TIMEOUT (5 * 60 * HZ)
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065 void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
0066 struct ceph_snap_realm *realm)
0067 {
0068 lockdep_assert_held(&mdsc->snap_rwsem);
0069
0070
0071
0072
0073
0074
0075
0076 if (atomic_inc_not_zero(&realm->nref))
0077 return;
0078
0079 spin_lock(&mdsc->snap_empty_lock);
0080 if (atomic_inc_return(&realm->nref) == 1)
0081 list_del_init(&realm->empty_item);
0082 spin_unlock(&mdsc->snap_empty_lock);
0083 }
0084
0085 static void __insert_snap_realm(struct rb_root *root,
0086 struct ceph_snap_realm *new)
0087 {
0088 struct rb_node **p = &root->rb_node;
0089 struct rb_node *parent = NULL;
0090 struct ceph_snap_realm *r = NULL;
0091
0092 while (*p) {
0093 parent = *p;
0094 r = rb_entry(parent, struct ceph_snap_realm, node);
0095 if (new->ino < r->ino)
0096 p = &(*p)->rb_left;
0097 else if (new->ino > r->ino)
0098 p = &(*p)->rb_right;
0099 else
0100 BUG();
0101 }
0102
0103 rb_link_node(&new->node, parent, p);
0104 rb_insert_color(&new->node, root);
0105 }
0106
0107
0108
0109
0110
0111
0112 static struct ceph_snap_realm *ceph_create_snap_realm(
0113 struct ceph_mds_client *mdsc,
0114 u64 ino)
0115 {
0116 struct ceph_snap_realm *realm;
0117
0118 lockdep_assert_held_write(&mdsc->snap_rwsem);
0119
0120 realm = kzalloc(sizeof(*realm), GFP_NOFS);
0121 if (!realm)
0122 return ERR_PTR(-ENOMEM);
0123
0124
0125 if (ino == CEPH_INO_GLOBAL_SNAPREALM)
0126 atomic_set(&realm->nref, 2);
0127 else
0128 atomic_set(&realm->nref, 1);
0129 realm->ino = ino;
0130 INIT_LIST_HEAD(&realm->children);
0131 INIT_LIST_HEAD(&realm->child_item);
0132 INIT_LIST_HEAD(&realm->empty_item);
0133 INIT_LIST_HEAD(&realm->dirty_item);
0134 INIT_LIST_HEAD(&realm->rebuild_item);
0135 INIT_LIST_HEAD(&realm->inodes_with_caps);
0136 spin_lock_init(&realm->inodes_with_caps_lock);
0137 __insert_snap_realm(&mdsc->snap_realms, realm);
0138 mdsc->num_snap_realms++;
0139
0140 dout("%s %llx %p\n", __func__, realm->ino, realm);
0141 return realm;
0142 }
0143
0144
0145
0146
0147
0148
0149 static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
0150 u64 ino)
0151 {
0152 struct rb_node *n = mdsc->snap_realms.rb_node;
0153 struct ceph_snap_realm *r;
0154
0155 lockdep_assert_held(&mdsc->snap_rwsem);
0156
0157 while (n) {
0158 r = rb_entry(n, struct ceph_snap_realm, node);
0159 if (ino < r->ino)
0160 n = n->rb_left;
0161 else if (ino > r->ino)
0162 n = n->rb_right;
0163 else {
0164 dout("%s %llx %p\n", __func__, r->ino, r);
0165 return r;
0166 }
0167 }
0168 return NULL;
0169 }
0170
0171 struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
0172 u64 ino)
0173 {
0174 struct ceph_snap_realm *r;
0175 r = __lookup_snap_realm(mdsc, ino);
0176 if (r)
0177 ceph_get_snap_realm(mdsc, r);
0178 return r;
0179 }
0180
0181 static void __put_snap_realm(struct ceph_mds_client *mdsc,
0182 struct ceph_snap_realm *realm);
0183
0184
0185
0186
0187 static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
0188 struct ceph_snap_realm *realm)
0189 {
0190 lockdep_assert_held_write(&mdsc->snap_rwsem);
0191
0192 dout("%s %p %llx\n", __func__, realm, realm->ino);
0193
0194 rb_erase(&realm->node, &mdsc->snap_realms);
0195 mdsc->num_snap_realms--;
0196
0197 if (realm->parent) {
0198 list_del_init(&realm->child_item);
0199 __put_snap_realm(mdsc, realm->parent);
0200 }
0201
0202 kfree(realm->prior_parent_snaps);
0203 kfree(realm->snaps);
0204 ceph_put_snap_context(realm->cached_context);
0205 kfree(realm);
0206 }
0207
0208
0209
0210
0211 static void __put_snap_realm(struct ceph_mds_client *mdsc,
0212 struct ceph_snap_realm *realm)
0213 {
0214 lockdep_assert_held_write(&mdsc->snap_rwsem);
0215
0216
0217
0218
0219
0220 if (atomic_dec_and_test(&realm->nref))
0221 __destroy_snap_realm(mdsc, realm);
0222 }
0223
0224
0225
0226
0227 void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
0228 struct ceph_snap_realm *realm)
0229 {
0230 if (!atomic_dec_and_lock(&realm->nref, &mdsc->snap_empty_lock))
0231 return;
0232
0233 if (down_write_trylock(&mdsc->snap_rwsem)) {
0234 spin_unlock(&mdsc->snap_empty_lock);
0235 __destroy_snap_realm(mdsc, realm);
0236 up_write(&mdsc->snap_rwsem);
0237 } else {
0238 list_add(&realm->empty_item, &mdsc->snap_empty);
0239 spin_unlock(&mdsc->snap_empty_lock);
0240 }
0241 }
0242
0243
0244
0245
0246
0247
0248
0249
0250 static void __cleanup_empty_realms(struct ceph_mds_client *mdsc)
0251 {
0252 struct ceph_snap_realm *realm;
0253
0254 lockdep_assert_held_write(&mdsc->snap_rwsem);
0255
0256 spin_lock(&mdsc->snap_empty_lock);
0257 while (!list_empty(&mdsc->snap_empty)) {
0258 realm = list_first_entry(&mdsc->snap_empty,
0259 struct ceph_snap_realm, empty_item);
0260 list_del(&realm->empty_item);
0261 spin_unlock(&mdsc->snap_empty_lock);
0262 __destroy_snap_realm(mdsc, realm);
0263 spin_lock(&mdsc->snap_empty_lock);
0264 }
0265 spin_unlock(&mdsc->snap_empty_lock);
0266 }
0267
0268 void ceph_cleanup_global_and_empty_realms(struct ceph_mds_client *mdsc)
0269 {
0270 struct ceph_snap_realm *global_realm;
0271
0272 down_write(&mdsc->snap_rwsem);
0273 global_realm = __lookup_snap_realm(mdsc, CEPH_INO_GLOBAL_SNAPREALM);
0274 if (global_realm)
0275 ceph_put_snap_realm(mdsc, global_realm);
0276 __cleanup_empty_realms(mdsc);
0277 up_write(&mdsc->snap_rwsem);
0278 }
0279
0280
0281
0282
0283
0284
0285
0286
0287
0288 static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc,
0289 struct ceph_snap_realm *realm,
0290 u64 parentino)
0291 {
0292 struct ceph_snap_realm *parent;
0293
0294 lockdep_assert_held_write(&mdsc->snap_rwsem);
0295
0296 if (realm->parent_ino == parentino)
0297 return 0;
0298
0299 parent = ceph_lookup_snap_realm(mdsc, parentino);
0300 if (!parent) {
0301 parent = ceph_create_snap_realm(mdsc, parentino);
0302 if (IS_ERR(parent))
0303 return PTR_ERR(parent);
0304 }
0305 dout("%s %llx %p: %llx %p -> %llx %p\n", __func__, realm->ino,
0306 realm, realm->parent_ino, realm->parent, parentino, parent);
0307 if (realm->parent) {
0308 list_del_init(&realm->child_item);
0309 ceph_put_snap_realm(mdsc, realm->parent);
0310 }
0311 realm->parent_ino = parentino;
0312 realm->parent = parent;
0313 list_add(&realm->child_item, &parent->children);
0314 return 1;
0315 }
0316
0317
0318 static int cmpu64_rev(const void *a, const void *b)
0319 {
0320 if (*(u64 *)a < *(u64 *)b)
0321 return 1;
0322 if (*(u64 *)a > *(u64 *)b)
0323 return -1;
0324 return 0;
0325 }
0326
0327
0328
0329
0330
0331 static int build_snap_context(struct ceph_snap_realm *realm,
0332 struct list_head *realm_queue,
0333 struct list_head *dirty_realms)
0334 {
0335 struct ceph_snap_realm *parent = realm->parent;
0336 struct ceph_snap_context *snapc;
0337 int err = 0;
0338 u32 num = realm->num_prior_parent_snaps + realm->num_snaps;
0339
0340
0341
0342
0343
0344
0345 if (parent) {
0346 if (!parent->cached_context) {
0347
0348 list_add(&parent->rebuild_item, realm_queue);
0349 return 1;
0350 }
0351 num += parent->cached_context->num_snaps;
0352 }
0353
0354
0355
0356
0357
0358 if (realm->cached_context &&
0359 realm->cached_context->seq == realm->seq &&
0360 (!parent ||
0361 realm->cached_context->seq >= parent->cached_context->seq)) {
0362 dout("%s %llx %p: %p seq %lld (%u snaps) (unchanged)\n",
0363 __func__, realm->ino, realm, realm->cached_context,
0364 realm->cached_context->seq,
0365 (unsigned int)realm->cached_context->num_snaps);
0366 return 0;
0367 }
0368
0369
0370 err = -ENOMEM;
0371 if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64))
0372 goto fail;
0373 snapc = ceph_create_snap_context(num, GFP_NOFS);
0374 if (!snapc)
0375 goto fail;
0376
0377
0378 num = 0;
0379 snapc->seq = realm->seq;
0380 if (parent) {
0381 u32 i;
0382
0383
0384
0385 for (i = 0; i < parent->cached_context->num_snaps; i++)
0386 if (parent->cached_context->snaps[i] >=
0387 realm->parent_since)
0388 snapc->snaps[num++] =
0389 parent->cached_context->snaps[i];
0390 if (parent->cached_context->seq > snapc->seq)
0391 snapc->seq = parent->cached_context->seq;
0392 }
0393 memcpy(snapc->snaps + num, realm->snaps,
0394 sizeof(u64)*realm->num_snaps);
0395 num += realm->num_snaps;
0396 memcpy(snapc->snaps + num, realm->prior_parent_snaps,
0397 sizeof(u64)*realm->num_prior_parent_snaps);
0398 num += realm->num_prior_parent_snaps;
0399
0400 sort(snapc->snaps, num, sizeof(u64), cmpu64_rev, NULL);
0401 snapc->num_snaps = num;
0402 dout("%s %llx %p: %p seq %lld (%u snaps)\n", __func__, realm->ino,
0403 realm, snapc, snapc->seq, (unsigned int) snapc->num_snaps);
0404
0405 ceph_put_snap_context(realm->cached_context);
0406 realm->cached_context = snapc;
0407
0408 list_add_tail(&realm->dirty_item, dirty_realms);
0409 return 0;
0410
0411 fail:
0412
0413
0414
0415
0416 if (realm->cached_context) {
0417 ceph_put_snap_context(realm->cached_context);
0418 realm->cached_context = NULL;
0419 }
0420 pr_err("%s %llx %p fail %d\n", __func__, realm->ino, realm, err);
0421 return err;
0422 }
0423
0424
0425
0426
0427 static void rebuild_snap_realms(struct ceph_snap_realm *realm,
0428 struct list_head *dirty_realms)
0429 {
0430 LIST_HEAD(realm_queue);
0431 int last = 0;
0432 bool skip = false;
0433
0434 list_add_tail(&realm->rebuild_item, &realm_queue);
0435
0436 while (!list_empty(&realm_queue)) {
0437 struct ceph_snap_realm *_realm, *child;
0438
0439 _realm = list_first_entry(&realm_queue,
0440 struct ceph_snap_realm,
0441 rebuild_item);
0442
0443
0444
0445
0446
0447
0448 if (last < 0) {
0449 list_del_init(&_realm->rebuild_item);
0450 continue;
0451 }
0452
0453 last = build_snap_context(_realm, &realm_queue, dirty_realms);
0454 dout("%s %llx %p, %s\n", __func__, _realm->ino, _realm,
0455 last > 0 ? "is deferred" : !last ? "succeeded" : "failed");
0456
0457
0458 list_for_each_entry(child, &_realm->children, child_item) {
0459 if (!list_empty(&child->rebuild_item)) {
0460 skip = true;
0461 break;
0462 }
0463 }
0464
0465 if (!skip) {
0466 list_for_each_entry(child, &_realm->children, child_item)
0467 list_add_tail(&child->rebuild_item, &realm_queue);
0468 }
0469
0470
0471 if (last <= 0)
0472 list_del_init(&_realm->rebuild_item);
0473 }
0474 }
0475
0476
0477
0478
0479
0480
0481 static int dup_array(u64 **dst, __le64 *src, u32 num)
0482 {
0483 u32 i;
0484
0485 kfree(*dst);
0486 if (num) {
0487 *dst = kcalloc(num, sizeof(u64), GFP_NOFS);
0488 if (!*dst)
0489 return -ENOMEM;
0490 for (i = 0; i < num; i++)
0491 (*dst)[i] = get_unaligned_le64(src + i);
0492 } else {
0493 *dst = NULL;
0494 }
0495 return 0;
0496 }
0497
0498 static bool has_new_snaps(struct ceph_snap_context *o,
0499 struct ceph_snap_context *n)
0500 {
0501 if (n->num_snaps == 0)
0502 return false;
0503
0504 return n->snaps[0] > o->seq;
0505 }
0506
0507
0508
0509
0510
0511
0512
0513
0514
0515
0516
0517
0518
0519
0520
0521 static void ceph_queue_cap_snap(struct ceph_inode_info *ci,
0522 struct ceph_cap_snap **pcapsnap)
0523 {
0524 struct inode *inode = &ci->netfs.inode;
0525 struct ceph_snap_context *old_snapc, *new_snapc;
0526 struct ceph_cap_snap *capsnap = *pcapsnap;
0527 struct ceph_buffer *old_blob = NULL;
0528 int used, dirty;
0529
0530 spin_lock(&ci->i_ceph_lock);
0531 used = __ceph_caps_used(ci);
0532 dirty = __ceph_caps_dirty(ci);
0533
0534 old_snapc = ci->i_head_snapc;
0535 new_snapc = ci->i_snap_realm->cached_context;
0536
0537
0538
0539
0540
0541
0542 if (used & CEPH_CAP_FILE_WR)
0543 dirty |= CEPH_CAP_FILE_WR;
0544
0545 if (__ceph_have_pending_cap_snap(ci)) {
0546
0547
0548
0549
0550 dout("%s %p %llx.%llx already pending\n",
0551 __func__, inode, ceph_vinop(inode));
0552 goto update_snapc;
0553 }
0554 if (ci->i_wrbuffer_ref_head == 0 &&
0555 !(dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))) {
0556 dout("%s %p %llx.%llx nothing dirty|writing\n",
0557 __func__, inode, ceph_vinop(inode));
0558 goto update_snapc;
0559 }
0560
0561 BUG_ON(!old_snapc);
0562
0563
0564
0565
0566
0567
0568
0569
0570
0571 if (has_new_snaps(old_snapc, new_snapc)) {
0572 if (dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))
0573 capsnap->need_flush = true;
0574 } else {
0575 if (!(used & CEPH_CAP_FILE_WR) &&
0576 ci->i_wrbuffer_ref_head == 0) {
0577 dout("%s %p %llx.%llx no new_snap|dirty_page|writing\n",
0578 __func__, inode, ceph_vinop(inode));
0579 goto update_snapc;
0580 }
0581 }
0582
0583 dout("%s %p %llx.%llx cap_snap %p queuing under %p %s %s\n",
0584 __func__, inode, ceph_vinop(inode), capsnap, old_snapc,
0585 ceph_cap_string(dirty), capsnap->need_flush ? "" : "no_flush");
0586 ihold(inode);
0587
0588 capsnap->follows = old_snapc->seq;
0589 capsnap->issued = __ceph_caps_issued(ci, NULL);
0590 capsnap->dirty = dirty;
0591
0592 capsnap->mode = inode->i_mode;
0593 capsnap->uid = inode->i_uid;
0594 capsnap->gid = inode->i_gid;
0595
0596 if (dirty & CEPH_CAP_XATTR_EXCL) {
0597 old_blob = __ceph_build_xattrs_blob(ci);
0598 capsnap->xattr_blob =
0599 ceph_buffer_get(ci->i_xattrs.blob);
0600 capsnap->xattr_version = ci->i_xattrs.version;
0601 } else {
0602 capsnap->xattr_blob = NULL;
0603 capsnap->xattr_version = 0;
0604 }
0605
0606 capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
0607
0608
0609
0610
0611 capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
0612 ci->i_wrbuffer_ref_head = 0;
0613 capsnap->context = old_snapc;
0614 list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
0615
0616 if (used & CEPH_CAP_FILE_WR) {
0617 dout("%s %p %llx.%llx cap_snap %p snapc %p seq %llu used WR,"
0618 " now pending\n", __func__, inode, ceph_vinop(inode),
0619 capsnap, old_snapc, old_snapc->seq);
0620 capsnap->writing = 1;
0621 } else {
0622
0623 __ceph_finish_cap_snap(ci, capsnap);
0624 }
0625 *pcapsnap = NULL;
0626 old_snapc = NULL;
0627
0628 update_snapc:
0629 if (ci->i_wrbuffer_ref_head == 0 &&
0630 ci->i_wr_ref == 0 &&
0631 ci->i_dirty_caps == 0 &&
0632 ci->i_flushing_caps == 0) {
0633 ci->i_head_snapc = NULL;
0634 } else {
0635 ci->i_head_snapc = ceph_get_snap_context(new_snapc);
0636 dout(" new snapc is %p\n", new_snapc);
0637 }
0638 spin_unlock(&ci->i_ceph_lock);
0639
0640 ceph_buffer_put(old_blob);
0641 ceph_put_snap_context(old_snapc);
0642 }
0643
0644
0645
0646
0647
0648
0649
0650
0651
0652 int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
0653 struct ceph_cap_snap *capsnap)
0654 {
0655 struct inode *inode = &ci->netfs.inode;
0656 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
0657
0658 BUG_ON(capsnap->writing);
0659 capsnap->size = i_size_read(inode);
0660 capsnap->mtime = inode->i_mtime;
0661 capsnap->atime = inode->i_atime;
0662 capsnap->ctime = inode->i_ctime;
0663 capsnap->btime = ci->i_btime;
0664 capsnap->change_attr = inode_peek_iversion_raw(inode);
0665 capsnap->time_warp_seq = ci->i_time_warp_seq;
0666 capsnap->truncate_size = ci->i_truncate_size;
0667 capsnap->truncate_seq = ci->i_truncate_seq;
0668 if (capsnap->dirty_pages) {
0669 dout("%s %p %llx.%llx cap_snap %p snapc %p %llu %s s=%llu "
0670 "still has %d dirty pages\n", __func__, inode,
0671 ceph_vinop(inode), capsnap, capsnap->context,
0672 capsnap->context->seq, ceph_cap_string(capsnap->dirty),
0673 capsnap->size, capsnap->dirty_pages);
0674 return 0;
0675 }
0676
0677
0678 if (ci->i_wb_ref) {
0679 dout("%s %p %llx.%llx cap_snap %p snapc %p %llu %s s=%llu "
0680 "used WRBUFFER, delaying\n", __func__, inode,
0681 ceph_vinop(inode), capsnap, capsnap->context,
0682 capsnap->context->seq, ceph_cap_string(capsnap->dirty),
0683 capsnap->size);
0684 capsnap->writing = 1;
0685 return 0;
0686 }
0687
0688 ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
0689 dout("%s %p %llx.%llx cap_snap %p snapc %p %llu %s s=%llu\n",
0690 __func__, inode, ceph_vinop(inode), capsnap, capsnap->context,
0691 capsnap->context->seq, ceph_cap_string(capsnap->dirty),
0692 capsnap->size);
0693
0694 spin_lock(&mdsc->snap_flush_lock);
0695 if (list_empty(&ci->i_snap_flush_item))
0696 list_add_tail(&ci->i_snap_flush_item, &mdsc->snap_flush_list);
0697 spin_unlock(&mdsc->snap_flush_lock);
0698 return 1;
0699 }
0700
0701
0702
0703
0704
0705 static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
0706 {
0707 struct ceph_inode_info *ci;
0708 struct inode *lastinode = NULL;
0709 struct ceph_cap_snap *capsnap = NULL;
0710
0711 dout("%s %p %llx inode\n", __func__, realm, realm->ino);
0712
0713 spin_lock(&realm->inodes_with_caps_lock);
0714 list_for_each_entry(ci, &realm->inodes_with_caps, i_snap_realm_item) {
0715 struct inode *inode = igrab(&ci->netfs.inode);
0716 if (!inode)
0717 continue;
0718 spin_unlock(&realm->inodes_with_caps_lock);
0719 iput(lastinode);
0720 lastinode = inode;
0721
0722
0723
0724
0725
0726
0727 if (!capsnap) {
0728 capsnap = kmem_cache_zalloc(ceph_cap_snap_cachep, GFP_NOFS);
0729 if (!capsnap) {
0730 pr_err("ENOMEM allocating ceph_cap_snap on %p\n",
0731 inode);
0732 return;
0733 }
0734 }
0735 capsnap->cap_flush.is_capsnap = true;
0736 refcount_set(&capsnap->nref, 1);
0737 INIT_LIST_HEAD(&capsnap->cap_flush.i_list);
0738 INIT_LIST_HEAD(&capsnap->cap_flush.g_list);
0739 INIT_LIST_HEAD(&capsnap->ci_item);
0740
0741 ceph_queue_cap_snap(ci, &capsnap);
0742 spin_lock(&realm->inodes_with_caps_lock);
0743 }
0744 spin_unlock(&realm->inodes_with_caps_lock);
0745 iput(lastinode);
0746
0747 if (capsnap)
0748 kmem_cache_free(ceph_cap_snap_cachep, capsnap);
0749 dout("%s %p %llx done\n", __func__, realm, realm->ino);
0750 }
0751
0752
0753
0754
0755
0756
0757
0758
0759 int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
0760 void *p, void *e, bool deletion,
0761 struct ceph_snap_realm **realm_ret)
0762 {
0763 struct ceph_mds_snap_realm *ri;
0764 __le64 *snaps;
0765 __le64 *prior_parent_snaps;
0766 struct ceph_snap_realm *realm = NULL;
0767 struct ceph_snap_realm *first_realm = NULL;
0768 struct ceph_snap_realm *realm_to_rebuild = NULL;
0769 int rebuild_snapcs;
0770 int err = -ENOMEM;
0771 LIST_HEAD(dirty_realms);
0772
0773 lockdep_assert_held_write(&mdsc->snap_rwsem);
0774
0775 dout("%s deletion=%d\n", __func__, deletion);
0776 more:
0777 rebuild_snapcs = 0;
0778 ceph_decode_need(&p, e, sizeof(*ri), bad);
0779 ri = p;
0780 p += sizeof(*ri);
0781 ceph_decode_need(&p, e, sizeof(u64)*(le32_to_cpu(ri->num_snaps) +
0782 le32_to_cpu(ri->num_prior_parent_snaps)), bad);
0783 snaps = p;
0784 p += sizeof(u64) * le32_to_cpu(ri->num_snaps);
0785 prior_parent_snaps = p;
0786 p += sizeof(u64) * le32_to_cpu(ri->num_prior_parent_snaps);
0787
0788 realm = ceph_lookup_snap_realm(mdsc, le64_to_cpu(ri->ino));
0789 if (!realm) {
0790 realm = ceph_create_snap_realm(mdsc, le64_to_cpu(ri->ino));
0791 if (IS_ERR(realm)) {
0792 err = PTR_ERR(realm);
0793 goto fail;
0794 }
0795 }
0796
0797
0798 err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent));
0799 if (err < 0)
0800 goto fail;
0801 rebuild_snapcs += err;
0802
0803 if (le64_to_cpu(ri->seq) > realm->seq) {
0804 dout("%s updating %llx %p %lld -> %lld\n", __func__,
0805 realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
0806
0807 realm->seq = le64_to_cpu(ri->seq);
0808 realm->created = le64_to_cpu(ri->created);
0809 realm->parent_since = le64_to_cpu(ri->parent_since);
0810
0811 realm->num_snaps = le32_to_cpu(ri->num_snaps);
0812 err = dup_array(&realm->snaps, snaps, realm->num_snaps);
0813 if (err < 0)
0814 goto fail;
0815
0816 realm->num_prior_parent_snaps =
0817 le32_to_cpu(ri->num_prior_parent_snaps);
0818 err = dup_array(&realm->prior_parent_snaps, prior_parent_snaps,
0819 realm->num_prior_parent_snaps);
0820 if (err < 0)
0821 goto fail;
0822
0823 if (realm->seq > mdsc->last_snap_seq)
0824 mdsc->last_snap_seq = realm->seq;
0825
0826 rebuild_snapcs = 1;
0827 } else if (!realm->cached_context) {
0828 dout("%s %llx %p seq %lld new\n", __func__,
0829 realm->ino, realm, realm->seq);
0830 rebuild_snapcs = 1;
0831 } else {
0832 dout("%s %llx %p seq %lld unchanged\n", __func__,
0833 realm->ino, realm, realm->seq);
0834 }
0835
0836 dout("done with %llx %p, rebuild_snapcs=%d, %p %p\n", realm->ino,
0837 realm, rebuild_snapcs, p, e);
0838
0839
0840
0841
0842
0843
0844 if (rebuild_snapcs)
0845 realm_to_rebuild = realm;
0846
0847
0848 if (realm_to_rebuild && p >= e)
0849 rebuild_snap_realms(realm_to_rebuild, &dirty_realms);
0850
0851 if (!first_realm)
0852 first_realm = realm;
0853 else
0854 ceph_put_snap_realm(mdsc, realm);
0855
0856 if (p < e)
0857 goto more;
0858
0859
0860
0861
0862
0863 while (!list_empty(&dirty_realms)) {
0864 realm = list_first_entry(&dirty_realms, struct ceph_snap_realm,
0865 dirty_item);
0866 list_del_init(&realm->dirty_item);
0867 queue_realm_cap_snaps(realm);
0868 }
0869
0870 if (realm_ret)
0871 *realm_ret = first_realm;
0872 else
0873 ceph_put_snap_realm(mdsc, first_realm);
0874
0875 __cleanup_empty_realms(mdsc);
0876 return 0;
0877
0878 bad:
0879 err = -EIO;
0880 fail:
0881 if (realm && !IS_ERR(realm))
0882 ceph_put_snap_realm(mdsc, realm);
0883 if (first_realm)
0884 ceph_put_snap_realm(mdsc, first_realm);
0885 pr_err("%s error %d\n", __func__, err);
0886 return err;
0887 }
0888
0889
0890
0891
0892
0893
0894
0895
0896 static void flush_snaps(struct ceph_mds_client *mdsc)
0897 {
0898 struct ceph_inode_info *ci;
0899 struct inode *inode;
0900 struct ceph_mds_session *session = NULL;
0901
0902 dout("%s\n", __func__);
0903 spin_lock(&mdsc->snap_flush_lock);
0904 while (!list_empty(&mdsc->snap_flush_list)) {
0905 ci = list_first_entry(&mdsc->snap_flush_list,
0906 struct ceph_inode_info, i_snap_flush_item);
0907 inode = &ci->netfs.inode;
0908 ihold(inode);
0909 spin_unlock(&mdsc->snap_flush_lock);
0910 ceph_flush_snaps(ci, &session);
0911 iput(inode);
0912 spin_lock(&mdsc->snap_flush_lock);
0913 }
0914 spin_unlock(&mdsc->snap_flush_lock);
0915
0916 ceph_put_mds_session(session);
0917 dout("%s done\n", __func__);
0918 }
0919
0920
0921
0922
0923
0924
0925
0926
0927
0928
0929
0930 void ceph_change_snap_realm(struct inode *inode, struct ceph_snap_realm *realm)
0931 {
0932 struct ceph_inode_info *ci = ceph_inode(inode);
0933 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
0934 struct ceph_snap_realm *oldrealm = ci->i_snap_realm;
0935
0936 lockdep_assert_held(&ci->i_ceph_lock);
0937
0938 if (oldrealm) {
0939 spin_lock(&oldrealm->inodes_with_caps_lock);
0940 list_del_init(&ci->i_snap_realm_item);
0941 if (oldrealm->ino == ci->i_vino.ino)
0942 oldrealm->inode = NULL;
0943 spin_unlock(&oldrealm->inodes_with_caps_lock);
0944 ceph_put_snap_realm(mdsc, oldrealm);
0945 }
0946
0947 ci->i_snap_realm = realm;
0948
0949 if (realm) {
0950 spin_lock(&realm->inodes_with_caps_lock);
0951 list_add(&ci->i_snap_realm_item, &realm->inodes_with_caps);
0952 if (realm->ino == ci->i_vino.ino)
0953 realm->inode = inode;
0954 spin_unlock(&realm->inodes_with_caps_lock);
0955 }
0956 }
0957
0958
0959
0960
0961
0962
0963
0964
0965
0966
0967
0968
0969 void ceph_handle_snap(struct ceph_mds_client *mdsc,
0970 struct ceph_mds_session *session,
0971 struct ceph_msg *msg)
0972 {
0973 struct super_block *sb = mdsc->fsc->sb;
0974 int mds = session->s_mds;
0975 u64 split;
0976 int op;
0977 int trace_len;
0978 struct ceph_snap_realm *realm = NULL;
0979 void *p = msg->front.iov_base;
0980 void *e = p + msg->front.iov_len;
0981 struct ceph_mds_snap_head *h;
0982 int num_split_inos, num_split_realms;
0983 __le64 *split_inos = NULL, *split_realms = NULL;
0984 int i;
0985 int locked_rwsem = 0;
0986
0987
0988 if (msg->front.iov_len < sizeof(*h))
0989 goto bad;
0990 h = p;
0991 op = le32_to_cpu(h->op);
0992 split = le64_to_cpu(h->split);
0993
0994 num_split_inos = le32_to_cpu(h->num_split_inos);
0995 num_split_realms = le32_to_cpu(h->num_split_realms);
0996 trace_len = le32_to_cpu(h->trace_len);
0997 p += sizeof(*h);
0998
0999 dout("%s from mds%d op %s split %llx tracelen %d\n", __func__,
1000 mds, ceph_snap_op_name(op), split, trace_len);
1001
1002 mutex_lock(&session->s_mutex);
1003 inc_session_sequence(session);
1004 mutex_unlock(&session->s_mutex);
1005
1006 down_write(&mdsc->snap_rwsem);
1007 locked_rwsem = 1;
1008
1009 if (op == CEPH_SNAP_OP_SPLIT) {
1010 struct ceph_mds_snap_realm *ri;
1011
1012
1013
1014
1015
1016
1017
1018 split_inos = p;
1019 p += sizeof(u64) * num_split_inos;
1020 split_realms = p;
1021 p += sizeof(u64) * num_split_realms;
1022 ceph_decode_need(&p, e, sizeof(*ri), bad);
1023
1024
1025
1026 ri = p;
1027
1028 realm = ceph_lookup_snap_realm(mdsc, split);
1029 if (!realm) {
1030 realm = ceph_create_snap_realm(mdsc, split);
1031 if (IS_ERR(realm))
1032 goto out;
1033 }
1034
1035 dout("splitting snap_realm %llx %p\n", realm->ino, realm);
1036 for (i = 0; i < num_split_inos; i++) {
1037 struct ceph_vino vino = {
1038 .ino = le64_to_cpu(split_inos[i]),
1039 .snap = CEPH_NOSNAP,
1040 };
1041 struct inode *inode = ceph_find_inode(sb, vino);
1042 struct ceph_inode_info *ci;
1043
1044 if (!inode)
1045 continue;
1046 ci = ceph_inode(inode);
1047
1048 spin_lock(&ci->i_ceph_lock);
1049 if (!ci->i_snap_realm)
1050 goto skip_inode;
1051
1052
1053
1054
1055
1056
1057
1058 if (ci->i_snap_realm->created >
1059 le64_to_cpu(ri->created)) {
1060 dout(" leaving %p %llx.%llx in newer realm %llx %p\n",
1061 inode, ceph_vinop(inode), ci->i_snap_realm->ino,
1062 ci->i_snap_realm);
1063 goto skip_inode;
1064 }
1065 dout(" will move %p %llx.%llx to split realm %llx %p\n",
1066 inode, ceph_vinop(inode), realm->ino, realm);
1067
1068 ceph_get_snap_realm(mdsc, realm);
1069 ceph_change_snap_realm(inode, realm);
1070 spin_unlock(&ci->i_ceph_lock);
1071 iput(inode);
1072 continue;
1073
1074 skip_inode:
1075 spin_unlock(&ci->i_ceph_lock);
1076 iput(inode);
1077 }
1078
1079
1080 for (i = 0; i < num_split_realms; i++) {
1081 struct ceph_snap_realm *child =
1082 __lookup_snap_realm(mdsc,
1083 le64_to_cpu(split_realms[i]));
1084 if (!child)
1085 continue;
1086 adjust_snap_realm_parent(mdsc, child, realm->ino);
1087 }
1088 }
1089
1090
1091
1092
1093
1094 ceph_update_snap_trace(mdsc, p, e,
1095 op == CEPH_SNAP_OP_DESTROY, NULL);
1096
1097 if (op == CEPH_SNAP_OP_SPLIT)
1098
1099 ceph_put_snap_realm(mdsc, realm);
1100
1101 __cleanup_empty_realms(mdsc);
1102
1103 up_write(&mdsc->snap_rwsem);
1104
1105 flush_snaps(mdsc);
1106 return;
1107
1108 bad:
1109 pr_err("%s corrupt snap message from mds%d\n", __func__, mds);
1110 ceph_msg_dump(msg);
1111 out:
1112 if (locked_rwsem)
1113 up_write(&mdsc->snap_rwsem);
1114 return;
1115 }
1116
1117 struct ceph_snapid_map* ceph_get_snapid_map(struct ceph_mds_client *mdsc,
1118 u64 snap)
1119 {
1120 struct ceph_snapid_map *sm, *exist;
1121 struct rb_node **p, *parent;
1122 int ret;
1123
1124 exist = NULL;
1125 spin_lock(&mdsc->snapid_map_lock);
1126 p = &mdsc->snapid_map_tree.rb_node;
1127 while (*p) {
1128 exist = rb_entry(*p, struct ceph_snapid_map, node);
1129 if (snap > exist->snap) {
1130 p = &(*p)->rb_left;
1131 } else if (snap < exist->snap) {
1132 p = &(*p)->rb_right;
1133 } else {
1134 if (atomic_inc_return(&exist->ref) == 1)
1135 list_del_init(&exist->lru);
1136 break;
1137 }
1138 exist = NULL;
1139 }
1140 spin_unlock(&mdsc->snapid_map_lock);
1141 if (exist) {
1142 dout("%s found snapid map %llx -> %x\n", __func__,
1143 exist->snap, exist->dev);
1144 return exist;
1145 }
1146
1147 sm = kmalloc(sizeof(*sm), GFP_NOFS);
1148 if (!sm)
1149 return NULL;
1150
1151 ret = get_anon_bdev(&sm->dev);
1152 if (ret < 0) {
1153 kfree(sm);
1154 return NULL;
1155 }
1156
1157 INIT_LIST_HEAD(&sm->lru);
1158 atomic_set(&sm->ref, 1);
1159 sm->snap = snap;
1160
1161 exist = NULL;
1162 parent = NULL;
1163 p = &mdsc->snapid_map_tree.rb_node;
1164 spin_lock(&mdsc->snapid_map_lock);
1165 while (*p) {
1166 parent = *p;
1167 exist = rb_entry(*p, struct ceph_snapid_map, node);
1168 if (snap > exist->snap)
1169 p = &(*p)->rb_left;
1170 else if (snap < exist->snap)
1171 p = &(*p)->rb_right;
1172 else
1173 break;
1174 exist = NULL;
1175 }
1176 if (exist) {
1177 if (atomic_inc_return(&exist->ref) == 1)
1178 list_del_init(&exist->lru);
1179 } else {
1180 rb_link_node(&sm->node, parent, p);
1181 rb_insert_color(&sm->node, &mdsc->snapid_map_tree);
1182 }
1183 spin_unlock(&mdsc->snapid_map_lock);
1184 if (exist) {
1185 free_anon_bdev(sm->dev);
1186 kfree(sm);
1187 dout("%s found snapid map %llx -> %x\n", __func__,
1188 exist->snap, exist->dev);
1189 return exist;
1190 }
1191
1192 dout("%s create snapid map %llx -> %x\n", __func__,
1193 sm->snap, sm->dev);
1194 return sm;
1195 }
1196
1197 void ceph_put_snapid_map(struct ceph_mds_client* mdsc,
1198 struct ceph_snapid_map *sm)
1199 {
1200 if (!sm)
1201 return;
1202 if (atomic_dec_and_lock(&sm->ref, &mdsc->snapid_map_lock)) {
1203 if (!RB_EMPTY_NODE(&sm->node)) {
1204 sm->last_used = jiffies;
1205 list_add_tail(&sm->lru, &mdsc->snapid_map_lru);
1206 spin_unlock(&mdsc->snapid_map_lock);
1207 } else {
1208
1209
1210 spin_unlock(&mdsc->snapid_map_lock);
1211 kfree(sm);
1212 }
1213 }
1214 }
1215
1216 void ceph_trim_snapid_map(struct ceph_mds_client *mdsc)
1217 {
1218 struct ceph_snapid_map *sm;
1219 unsigned long now;
1220 LIST_HEAD(to_free);
1221
1222 spin_lock(&mdsc->snapid_map_lock);
1223 now = jiffies;
1224
1225 while (!list_empty(&mdsc->snapid_map_lru)) {
1226 sm = list_first_entry(&mdsc->snapid_map_lru,
1227 struct ceph_snapid_map, lru);
1228 if (time_after(sm->last_used + CEPH_SNAPID_MAP_TIMEOUT, now))
1229 break;
1230
1231 rb_erase(&sm->node, &mdsc->snapid_map_tree);
1232 list_move(&sm->lru, &to_free);
1233 }
1234 spin_unlock(&mdsc->snapid_map_lock);
1235
1236 while (!list_empty(&to_free)) {
1237 sm = list_first_entry(&to_free, struct ceph_snapid_map, lru);
1238 list_del(&sm->lru);
1239 dout("trim snapid map %llx -> %x\n", sm->snap, sm->dev);
1240 free_anon_bdev(sm->dev);
1241 kfree(sm);
1242 }
1243 }
1244
1245 void ceph_cleanup_snapid_map(struct ceph_mds_client *mdsc)
1246 {
1247 struct ceph_snapid_map *sm;
1248 struct rb_node *p;
1249 LIST_HEAD(to_free);
1250
1251 spin_lock(&mdsc->snapid_map_lock);
1252 while ((p = rb_first(&mdsc->snapid_map_tree))) {
1253 sm = rb_entry(p, struct ceph_snapid_map, node);
1254 rb_erase(p, &mdsc->snapid_map_tree);
1255 RB_CLEAR_NODE(p);
1256 list_move(&sm->lru, &to_free);
1257 }
1258 spin_unlock(&mdsc->snapid_map_lock);
1259
1260 while (!list_empty(&to_free)) {
1261 sm = list_first_entry(&to_free, struct ceph_snapid_map, lru);
1262 list_del(&sm->lru);
1263 free_anon_bdev(sm->dev);
1264 if (WARN_ON_ONCE(atomic_read(&sm->ref))) {
1265 pr_err("snapid map %llx -> %x still in use\n",
1266 sm->snap, sm->dev);
1267 }
1268 kfree(sm);
1269 }
1270 }