0001
0002 #include <linux/ceph/ceph_debug.h>
0003
0004 #include <linux/backing-dev.h>
0005 #include <linux/fs.h>
0006 #include <linux/mm.h>
0007 #include <linux/swap.h>
0008 #include <linux/pagemap.h>
0009 #include <linux/slab.h>
0010 #include <linux/pagevec.h>
0011 #include <linux/task_io_accounting_ops.h>
0012 #include <linux/signal.h>
0013 #include <linux/iversion.h>
0014 #include <linux/ktime.h>
0015 #include <linux/netfs.h>
0016
0017 #include "super.h"
0018 #include "mds_client.h"
0019 #include "cache.h"
0020 #include "metric.h"
0021 #include <linux/ceph/osd_client.h>
0022 #include <linux/ceph/striper.h>
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060 #define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10))
0061 #define CONGESTION_OFF_THRESH(congestion_kb) \
0062 (CONGESTION_ON_THRESH(congestion_kb) - \
0063 (CONGESTION_ON_THRESH(congestion_kb) >> 2))
0064
0065 static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len,
0066 struct folio **foliop, void **_fsdata);
0067
0068 static inline struct ceph_snap_context *page_snap_context(struct page *page)
0069 {
0070 if (PagePrivate(page))
0071 return (void *)page->private;
0072 return NULL;
0073 }
0074
0075
0076
0077
0078
0079 static bool ceph_dirty_folio(struct address_space *mapping, struct folio *folio)
0080 {
0081 struct inode *inode;
0082 struct ceph_inode_info *ci;
0083 struct ceph_snap_context *snapc;
0084
0085 if (folio_test_dirty(folio)) {
0086 dout("%p dirty_folio %p idx %lu -- already dirty\n",
0087 mapping->host, folio, folio->index);
0088 VM_BUG_ON_FOLIO(!folio_test_private(folio), folio);
0089 return false;
0090 }
0091
0092 inode = mapping->host;
0093 ci = ceph_inode(inode);
0094
0095
0096 spin_lock(&ci->i_ceph_lock);
0097 BUG_ON(ci->i_wr_ref == 0);
0098 if (__ceph_have_pending_cap_snap(ci)) {
0099 struct ceph_cap_snap *capsnap =
0100 list_last_entry(&ci->i_cap_snaps,
0101 struct ceph_cap_snap,
0102 ci_item);
0103 snapc = ceph_get_snap_context(capsnap->context);
0104 capsnap->dirty_pages++;
0105 } else {
0106 BUG_ON(!ci->i_head_snapc);
0107 snapc = ceph_get_snap_context(ci->i_head_snapc);
0108 ++ci->i_wrbuffer_ref_head;
0109 }
0110 if (ci->i_wrbuffer_ref == 0)
0111 ihold(inode);
0112 ++ci->i_wrbuffer_ref;
0113 dout("%p dirty_folio %p idx %lu head %d/%d -> %d/%d "
0114 "snapc %p seq %lld (%d snaps)\n",
0115 mapping->host, folio, folio->index,
0116 ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1,
0117 ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
0118 snapc, snapc->seq, snapc->num_snaps);
0119 spin_unlock(&ci->i_ceph_lock);
0120
0121
0122
0123
0124
0125 VM_WARN_ON_FOLIO(folio->private, folio);
0126 folio_attach_private(folio, snapc);
0127
0128 return ceph_fscache_dirty_folio(mapping, folio);
0129 }
0130
0131
0132
0133
0134
0135
0136 static void ceph_invalidate_folio(struct folio *folio, size_t offset,
0137 size_t length)
0138 {
0139 struct inode *inode;
0140 struct ceph_inode_info *ci;
0141 struct ceph_snap_context *snapc;
0142
0143 inode = folio->mapping->host;
0144 ci = ceph_inode(inode);
0145
0146 if (offset != 0 || length != folio_size(folio)) {
0147 dout("%p invalidate_folio idx %lu partial dirty page %zu~%zu\n",
0148 inode, folio->index, offset, length);
0149 return;
0150 }
0151
0152 WARN_ON(!folio_test_locked(folio));
0153 if (folio_test_private(folio)) {
0154 dout("%p invalidate_folio idx %lu full dirty page\n",
0155 inode, folio->index);
0156
0157 snapc = folio_detach_private(folio);
0158 ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
0159 ceph_put_snap_context(snapc);
0160 }
0161
0162 folio_wait_fscache(folio);
0163 }
0164
0165 static bool ceph_release_folio(struct folio *folio, gfp_t gfp)
0166 {
0167 struct inode *inode = folio->mapping->host;
0168
0169 dout("%llx:%llx release_folio idx %lu (%sdirty)\n",
0170 ceph_vinop(inode),
0171 folio->index, folio_test_dirty(folio) ? "" : "not ");
0172
0173 if (folio_test_private(folio))
0174 return false;
0175
0176 if (folio_test_fscache(folio)) {
0177 if (current_is_kswapd() || !(gfp & __GFP_FS))
0178 return false;
0179 folio_wait_fscache(folio);
0180 }
0181 ceph_fscache_note_page_release(inode);
0182 return true;
0183 }
0184
0185 static void ceph_netfs_expand_readahead(struct netfs_io_request *rreq)
0186 {
0187 struct inode *inode = rreq->inode;
0188 struct ceph_inode_info *ci = ceph_inode(inode);
0189 struct ceph_file_layout *lo = &ci->i_layout;
0190 u32 blockoff;
0191 u64 blockno;
0192
0193
0194 blockno = div_u64_rem(rreq->start, lo->stripe_unit, &blockoff);
0195 rreq->start = blockno * lo->stripe_unit;
0196 rreq->len += blockoff;
0197
0198
0199 rreq->len = roundup(rreq->len, lo->stripe_unit);
0200 }
0201
0202 static bool ceph_netfs_clamp_length(struct netfs_io_subrequest *subreq)
0203 {
0204 struct inode *inode = subreq->rreq->inode;
0205 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
0206 struct ceph_inode_info *ci = ceph_inode(inode);
0207 u64 objno, objoff;
0208 u32 xlen;
0209
0210
0211 ceph_calc_file_object_mapping(&ci->i_layout, subreq->start, subreq->len,
0212 &objno, &objoff, &xlen);
0213 subreq->len = min(xlen, fsc->mount_options->rsize);
0214 return true;
0215 }
0216
0217 static void finish_netfs_read(struct ceph_osd_request *req)
0218 {
0219 struct ceph_fs_client *fsc = ceph_inode_to_client(req->r_inode);
0220 struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
0221 struct netfs_io_subrequest *subreq = req->r_priv;
0222 int num_pages;
0223 int err = req->r_result;
0224
0225 ceph_update_read_metrics(&fsc->mdsc->metric, req->r_start_latency,
0226 req->r_end_latency, osd_data->length, err);
0227
0228 dout("%s: result %d subreq->len=%zu i_size=%lld\n", __func__, req->r_result,
0229 subreq->len, i_size_read(req->r_inode));
0230
0231
0232 if (err == -ENOENT)
0233 err = 0;
0234 else if (err == -EBLOCKLISTED)
0235 fsc->blocklisted = true;
0236
0237 if (err >= 0 && err < subreq->len)
0238 __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags);
0239
0240 netfs_subreq_terminated(subreq, err, false);
0241
0242 num_pages = calc_pages_for(osd_data->alignment, osd_data->length);
0243 ceph_put_page_vector(osd_data->pages, num_pages, false);
0244 iput(req->r_inode);
0245 }
0246
0247 static bool ceph_netfs_issue_op_inline(struct netfs_io_subrequest *subreq)
0248 {
0249 struct netfs_io_request *rreq = subreq->rreq;
0250 struct inode *inode = rreq->inode;
0251 struct ceph_mds_reply_info_parsed *rinfo;
0252 struct ceph_mds_reply_info_in *iinfo;
0253 struct ceph_mds_request *req;
0254 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
0255 struct ceph_inode_info *ci = ceph_inode(inode);
0256 struct iov_iter iter;
0257 ssize_t err = 0;
0258 size_t len;
0259 int mode;
0260
0261 __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags);
0262 __clear_bit(NETFS_SREQ_COPY_TO_CACHE, &subreq->flags);
0263
0264 if (subreq->start >= inode->i_size)
0265 goto out;
0266
0267
0268 mode = ceph_try_to_choose_auth_mds(inode, CEPH_STAT_CAP_INLINE_DATA);
0269 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode);
0270 if (IS_ERR(req)) {
0271 err = PTR_ERR(req);
0272 goto out;
0273 }
0274 req->r_ino1 = ci->i_vino;
0275 req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INLINE_DATA);
0276 req->r_num_caps = 2;
0277
0278 err = ceph_mdsc_do_request(mdsc, NULL, req);
0279 if (err < 0)
0280 goto out;
0281
0282 rinfo = &req->r_reply_info;
0283 iinfo = &rinfo->targeti;
0284 if (iinfo->inline_version == CEPH_INLINE_NONE) {
0285
0286 ceph_mdsc_put_request(req);
0287 return false;
0288 }
0289
0290 len = min_t(size_t, iinfo->inline_len - subreq->start, subreq->len);
0291 iov_iter_xarray(&iter, READ, &rreq->mapping->i_pages, subreq->start, len);
0292 err = copy_to_iter(iinfo->inline_data + subreq->start, len, &iter);
0293 if (err == 0)
0294 err = -EFAULT;
0295
0296 ceph_mdsc_put_request(req);
0297 out:
0298 netfs_subreq_terminated(subreq, err, false);
0299 return true;
0300 }
0301
0302 static void ceph_netfs_issue_read(struct netfs_io_subrequest *subreq)
0303 {
0304 struct netfs_io_request *rreq = subreq->rreq;
0305 struct inode *inode = rreq->inode;
0306 struct ceph_inode_info *ci = ceph_inode(inode);
0307 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
0308 struct ceph_osd_request *req;
0309 struct ceph_vino vino = ceph_vino(inode);
0310 struct iov_iter iter;
0311 struct page **pages;
0312 size_t page_off;
0313 int err = 0;
0314 u64 len = subreq->len;
0315
0316 if (ceph_has_inline_data(ci) && ceph_netfs_issue_op_inline(subreq))
0317 return;
0318
0319 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, subreq->start, &len,
0320 0, 1, CEPH_OSD_OP_READ,
0321 CEPH_OSD_FLAG_READ | fsc->client->osdc.client->options->read_from_replica,
0322 NULL, ci->i_truncate_seq, ci->i_truncate_size, false);
0323 if (IS_ERR(req)) {
0324 err = PTR_ERR(req);
0325 req = NULL;
0326 goto out;
0327 }
0328
0329 dout("%s: pos=%llu orig_len=%zu len=%llu\n", __func__, subreq->start, subreq->len, len);
0330 iov_iter_xarray(&iter, READ, &rreq->mapping->i_pages, subreq->start, len);
0331 err = iov_iter_get_pages_alloc2(&iter, &pages, len, &page_off);
0332 if (err < 0) {
0333 dout("%s: iov_ter_get_pages_alloc returned %d\n", __func__, err);
0334 goto out;
0335 }
0336
0337
0338 WARN_ON_ONCE(page_off);
0339 len = err;
0340 err = 0;
0341
0342 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false);
0343 req->r_callback = finish_netfs_read;
0344 req->r_priv = subreq;
0345 req->r_inode = inode;
0346 ihold(inode);
0347
0348 ceph_osdc_start_request(req->r_osdc, req);
0349 out:
0350 ceph_osdc_put_request(req);
0351 if (err)
0352 netfs_subreq_terminated(subreq, err, false);
0353 dout("%s: result %d\n", __func__, err);
0354 }
0355
0356 static int ceph_init_request(struct netfs_io_request *rreq, struct file *file)
0357 {
0358 struct inode *inode = rreq->inode;
0359 int got = 0, want = CEPH_CAP_FILE_CACHE;
0360 int ret = 0;
0361
0362 if (rreq->origin != NETFS_READAHEAD)
0363 return 0;
0364
0365 if (file) {
0366 struct ceph_rw_context *rw_ctx;
0367 struct ceph_file_info *fi = file->private_data;
0368
0369 rw_ctx = ceph_find_rw_context(fi);
0370 if (rw_ctx)
0371 return 0;
0372 }
0373
0374
0375
0376
0377
0378 ret = ceph_try_get_caps(inode, CEPH_CAP_FILE_RD, want, true, &got);
0379 if (ret < 0) {
0380 dout("start_read %p, error getting cap\n", inode);
0381 return ret;
0382 }
0383
0384 if (!(got & want)) {
0385 dout("start_read %p, no cache cap\n", inode);
0386 return -EACCES;
0387 }
0388 if (ret == 0)
0389 return -EACCES;
0390
0391 rreq->netfs_priv = (void *)(uintptr_t)got;
0392 return 0;
0393 }
0394
0395 static void ceph_netfs_free_request(struct netfs_io_request *rreq)
0396 {
0397 struct ceph_inode_info *ci = ceph_inode(rreq->inode);
0398 int got = (uintptr_t)rreq->netfs_priv;
0399
0400 if (got)
0401 ceph_put_cap_refs(ci, got);
0402 }
0403
0404 const struct netfs_request_ops ceph_netfs_ops = {
0405 .init_request = ceph_init_request,
0406 .free_request = ceph_netfs_free_request,
0407 .begin_cache_operation = ceph_begin_cache_operation,
0408 .issue_read = ceph_netfs_issue_read,
0409 .expand_readahead = ceph_netfs_expand_readahead,
0410 .clamp_length = ceph_netfs_clamp_length,
0411 .check_write_begin = ceph_netfs_check_write_begin,
0412 };
0413
0414 #ifdef CONFIG_CEPH_FSCACHE
0415 static void ceph_set_page_fscache(struct page *page)
0416 {
0417 set_page_fscache(page);
0418 }
0419
0420 static void ceph_fscache_write_terminated(void *priv, ssize_t error, bool was_async)
0421 {
0422 struct inode *inode = priv;
0423
0424 if (IS_ERR_VALUE(error) && error != -ENOBUFS)
0425 ceph_fscache_invalidate(inode, false);
0426 }
0427
0428 static void ceph_fscache_write_to_cache(struct inode *inode, u64 off, u64 len, bool caching)
0429 {
0430 struct ceph_inode_info *ci = ceph_inode(inode);
0431 struct fscache_cookie *cookie = ceph_fscache_cookie(ci);
0432
0433 fscache_write_to_cache(cookie, inode->i_mapping, off, len, i_size_read(inode),
0434 ceph_fscache_write_terminated, inode, caching);
0435 }
0436 #else
0437 static inline void ceph_set_page_fscache(struct page *page)
0438 {
0439 }
0440
0441 static inline void ceph_fscache_write_to_cache(struct inode *inode, u64 off, u64 len, bool caching)
0442 {
0443 }
0444 #endif
0445
0446 struct ceph_writeback_ctl
0447 {
0448 loff_t i_size;
0449 u64 truncate_size;
0450 u32 truncate_seq;
0451 bool size_stable;
0452 bool head_snapc;
0453 };
0454
0455
0456
0457
0458
0459 static struct ceph_snap_context *
0460 get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl,
0461 struct ceph_snap_context *page_snapc)
0462 {
0463 struct ceph_inode_info *ci = ceph_inode(inode);
0464 struct ceph_snap_context *snapc = NULL;
0465 struct ceph_cap_snap *capsnap = NULL;
0466
0467 spin_lock(&ci->i_ceph_lock);
0468 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
0469 dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
0470 capsnap->context, capsnap->dirty_pages);
0471 if (!capsnap->dirty_pages)
0472 continue;
0473
0474
0475 if (snapc && capsnap->context != page_snapc)
0476 continue;
0477
0478 if (ctl) {
0479 if (capsnap->writing) {
0480 ctl->i_size = i_size_read(inode);
0481 ctl->size_stable = false;
0482 } else {
0483 ctl->i_size = capsnap->size;
0484 ctl->size_stable = true;
0485 }
0486 ctl->truncate_size = capsnap->truncate_size;
0487 ctl->truncate_seq = capsnap->truncate_seq;
0488 ctl->head_snapc = false;
0489 }
0490
0491 if (snapc)
0492 break;
0493
0494 snapc = ceph_get_snap_context(capsnap->context);
0495 if (!page_snapc ||
0496 page_snapc == snapc ||
0497 page_snapc->seq > snapc->seq)
0498 break;
0499 }
0500 if (!snapc && ci->i_wrbuffer_ref_head) {
0501 snapc = ceph_get_snap_context(ci->i_head_snapc);
0502 dout(" head snapc %p has %d dirty pages\n",
0503 snapc, ci->i_wrbuffer_ref_head);
0504 if (ctl) {
0505 ctl->i_size = i_size_read(inode);
0506 ctl->truncate_size = ci->i_truncate_size;
0507 ctl->truncate_seq = ci->i_truncate_seq;
0508 ctl->size_stable = false;
0509 ctl->head_snapc = true;
0510 }
0511 }
0512 spin_unlock(&ci->i_ceph_lock);
0513 return snapc;
0514 }
0515
0516 static u64 get_writepages_data_length(struct inode *inode,
0517 struct page *page, u64 start)
0518 {
0519 struct ceph_inode_info *ci = ceph_inode(inode);
0520 struct ceph_snap_context *snapc = page_snap_context(page);
0521 struct ceph_cap_snap *capsnap = NULL;
0522 u64 end = i_size_read(inode);
0523
0524 if (snapc != ci->i_head_snapc) {
0525 bool found = false;
0526 spin_lock(&ci->i_ceph_lock);
0527 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
0528 if (capsnap->context == snapc) {
0529 if (!capsnap->writing)
0530 end = capsnap->size;
0531 found = true;
0532 break;
0533 }
0534 }
0535 spin_unlock(&ci->i_ceph_lock);
0536 WARN_ON(!found);
0537 }
0538 if (end > page_offset(page) + thp_size(page))
0539 end = page_offset(page) + thp_size(page);
0540 return end > start ? end - start : 0;
0541 }
0542
0543
0544
0545
0546
0547
0548
0549 static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
0550 {
0551 struct folio *folio = page_folio(page);
0552 struct inode *inode = page->mapping->host;
0553 struct ceph_inode_info *ci = ceph_inode(inode);
0554 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
0555 struct ceph_snap_context *snapc, *oldest;
0556 loff_t page_off = page_offset(page);
0557 int err;
0558 loff_t len = thp_size(page);
0559 struct ceph_writeback_ctl ceph_wbc;
0560 struct ceph_osd_client *osdc = &fsc->client->osdc;
0561 struct ceph_osd_request *req;
0562 bool caching = ceph_is_cache_enabled(inode);
0563
0564 dout("writepage %p idx %lu\n", page, page->index);
0565
0566
0567 snapc = page_snap_context(page);
0568 if (!snapc) {
0569 dout("writepage %p page %p not dirty?\n", inode, page);
0570 return 0;
0571 }
0572 oldest = get_oldest_context(inode, &ceph_wbc, snapc);
0573 if (snapc->seq > oldest->seq) {
0574 dout("writepage %p page %p snapc %p not writeable - noop\n",
0575 inode, page, snapc);
0576
0577 WARN_ON(!(current->flags & PF_MEMALLOC));
0578 ceph_put_snap_context(oldest);
0579 redirty_page_for_writepage(wbc, page);
0580 return 0;
0581 }
0582 ceph_put_snap_context(oldest);
0583
0584
0585 if (page_off >= ceph_wbc.i_size) {
0586 dout("folio at %lu beyond eof %llu\n", folio->index,
0587 ceph_wbc.i_size);
0588 folio_invalidate(folio, 0, folio_size(folio));
0589 return 0;
0590 }
0591
0592 if (ceph_wbc.i_size < page_off + len)
0593 len = ceph_wbc.i_size - page_off;
0594
0595 dout("writepage %p page %p index %lu on %llu~%llu snapc %p seq %lld\n",
0596 inode, page, page->index, page_off, len, snapc, snapc->seq);
0597
0598 if (atomic_long_inc_return(&fsc->writeback_count) >
0599 CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
0600 fsc->write_congested = true;
0601
0602 req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), page_off, &len, 0, 1,
0603 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, snapc,
0604 ceph_wbc.truncate_seq, ceph_wbc.truncate_size,
0605 true);
0606 if (IS_ERR(req)) {
0607 redirty_page_for_writepage(wbc, page);
0608 return PTR_ERR(req);
0609 }
0610
0611 set_page_writeback(page);
0612 if (caching)
0613 ceph_set_page_fscache(page);
0614 ceph_fscache_write_to_cache(inode, page_off, len, caching);
0615
0616
0617 WARN_ON_ONCE(len > thp_size(page));
0618 osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);
0619 dout("writepage %llu~%llu (%llu bytes)\n", page_off, len, len);
0620
0621 req->r_mtime = inode->i_mtime;
0622 ceph_osdc_start_request(osdc, req);
0623 err = ceph_osdc_wait_request(osdc, req);
0624
0625 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
0626 req->r_end_latency, len, err);
0627
0628 ceph_osdc_put_request(req);
0629 if (err == 0)
0630 err = len;
0631
0632 if (err < 0) {
0633 struct writeback_control tmp_wbc;
0634 if (!wbc)
0635 wbc = &tmp_wbc;
0636 if (err == -ERESTARTSYS) {
0637
0638 dout("writepage interrupted page %p\n", page);
0639 redirty_page_for_writepage(wbc, page);
0640 end_page_writeback(page);
0641 return err;
0642 }
0643 if (err == -EBLOCKLISTED)
0644 fsc->blocklisted = true;
0645 dout("writepage setting page/mapping error %d %p\n",
0646 err, page);
0647 mapping_set_error(&inode->i_data, err);
0648 wbc->pages_skipped++;
0649 } else {
0650 dout("writepage cleaned page %p\n", page);
0651 err = 0;
0652 }
0653 oldest = detach_page_private(page);
0654 WARN_ON_ONCE(oldest != snapc);
0655 end_page_writeback(page);
0656 ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
0657 ceph_put_snap_context(snapc);
0658
0659 if (atomic_long_dec_return(&fsc->writeback_count) <
0660 CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
0661 fsc->write_congested = false;
0662
0663 return err;
0664 }
0665
0666 static int ceph_writepage(struct page *page, struct writeback_control *wbc)
0667 {
0668 int err;
0669 struct inode *inode = page->mapping->host;
0670 BUG_ON(!inode);
0671 ihold(inode);
0672
0673 if (wbc->sync_mode == WB_SYNC_NONE &&
0674 ceph_inode_to_client(inode)->write_congested)
0675 return AOP_WRITEPAGE_ACTIVATE;
0676
0677 wait_on_page_fscache(page);
0678
0679 err = writepage_nounlock(page, wbc);
0680 if (err == -ERESTARTSYS) {
0681
0682
0683 err = 0;
0684 }
0685 unlock_page(page);
0686 iput(inode);
0687 return err;
0688 }
0689
0690
0691
0692
0693
0694
0695
0696 static void writepages_finish(struct ceph_osd_request *req)
0697 {
0698 struct inode *inode = req->r_inode;
0699 struct ceph_inode_info *ci = ceph_inode(inode);
0700 struct ceph_osd_data *osd_data;
0701 struct page *page;
0702 int num_pages, total_pages = 0;
0703 int i, j;
0704 int rc = req->r_result;
0705 struct ceph_snap_context *snapc = req->r_snapc;
0706 struct address_space *mapping = inode->i_mapping;
0707 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
0708 unsigned int len = 0;
0709 bool remove_page;
0710
0711 dout("writepages_finish %p rc %d\n", inode, rc);
0712 if (rc < 0) {
0713 mapping_set_error(mapping, rc);
0714 ceph_set_error_write(ci);
0715 if (rc == -EBLOCKLISTED)
0716 fsc->blocklisted = true;
0717 } else {
0718 ceph_clear_error_write(ci);
0719 }
0720
0721
0722
0723
0724
0725
0726
0727 remove_page = !(ceph_caps_issued(ci) &
0728 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
0729
0730
0731 for (i = 0; i < req->r_num_ops; i++) {
0732 if (req->r_ops[i].op != CEPH_OSD_OP_WRITE) {
0733 pr_warn("%s incorrect op %d req %p index %d tid %llu\n",
0734 __func__, req->r_ops[i].op, req, i, req->r_tid);
0735 break;
0736 }
0737
0738 osd_data = osd_req_op_extent_osd_data(req, i);
0739 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
0740 len += osd_data->length;
0741 num_pages = calc_pages_for((u64)osd_data->alignment,
0742 (u64)osd_data->length);
0743 total_pages += num_pages;
0744 for (j = 0; j < num_pages; j++) {
0745 page = osd_data->pages[j];
0746 BUG_ON(!page);
0747 WARN_ON(!PageUptodate(page));
0748
0749 if (atomic_long_dec_return(&fsc->writeback_count) <
0750 CONGESTION_OFF_THRESH(
0751 fsc->mount_options->congestion_kb))
0752 fsc->write_congested = false;
0753
0754 ceph_put_snap_context(detach_page_private(page));
0755 end_page_writeback(page);
0756 dout("unlocking %p\n", page);
0757
0758 if (remove_page)
0759 generic_error_remove_page(inode->i_mapping,
0760 page);
0761
0762 unlock_page(page);
0763 }
0764 dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
0765 inode, osd_data->length, rc >= 0 ? num_pages : 0);
0766
0767 release_pages(osd_data->pages, num_pages);
0768 }
0769
0770 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
0771 req->r_end_latency, len, rc);
0772
0773 ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
0774
0775 osd_data = osd_req_op_extent_osd_data(req, 0);
0776 if (osd_data->pages_from_pool)
0777 mempool_free(osd_data->pages, ceph_wb_pagevec_pool);
0778 else
0779 kfree(osd_data->pages);
0780 ceph_osdc_put_request(req);
0781 }
0782
0783
0784
0785
0786 static int ceph_writepages_start(struct address_space *mapping,
0787 struct writeback_control *wbc)
0788 {
0789 struct inode *inode = mapping->host;
0790 struct ceph_inode_info *ci = ceph_inode(inode);
0791 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
0792 struct ceph_vino vino = ceph_vino(inode);
0793 pgoff_t index, start_index, end = -1;
0794 struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
0795 struct pagevec pvec;
0796 int rc = 0;
0797 unsigned int wsize = i_blocksize(inode);
0798 struct ceph_osd_request *req = NULL;
0799 struct ceph_writeback_ctl ceph_wbc;
0800 bool should_loop, range_whole = false;
0801 bool done = false;
0802 bool caching = ceph_is_cache_enabled(inode);
0803
0804 if (wbc->sync_mode == WB_SYNC_NONE &&
0805 fsc->write_congested)
0806 return 0;
0807
0808 dout("writepages_start %p (mode=%s)\n", inode,
0809 wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
0810 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
0811
0812 if (ceph_inode_is_shutdown(inode)) {
0813 if (ci->i_wrbuffer_ref > 0) {
0814 pr_warn_ratelimited(
0815 "writepage_start %p %lld forced umount\n",
0816 inode, ceph_ino(inode));
0817 }
0818 mapping_set_error(mapping, -EIO);
0819 return -EIO;
0820 }
0821 if (fsc->mount_options->wsize < wsize)
0822 wsize = fsc->mount_options->wsize;
0823
0824 pagevec_init(&pvec);
0825
0826 start_index = wbc->range_cyclic ? mapping->writeback_index : 0;
0827 index = start_index;
0828
0829 retry:
0830
0831 snapc = get_oldest_context(inode, &ceph_wbc, NULL);
0832 if (!snapc) {
0833
0834
0835 dout(" no snap context with dirty data?\n");
0836 goto out;
0837 }
0838 dout(" oldest snapc is %p seq %lld (%d snaps)\n",
0839 snapc, snapc->seq, snapc->num_snaps);
0840
0841 should_loop = false;
0842 if (ceph_wbc.head_snapc && snapc != last_snapc) {
0843
0844 if (wbc->range_cyclic) {
0845 index = start_index;
0846 end = -1;
0847 if (index > 0)
0848 should_loop = true;
0849 dout(" cyclic, start at %lu\n", index);
0850 } else {
0851 index = wbc->range_start >> PAGE_SHIFT;
0852 end = wbc->range_end >> PAGE_SHIFT;
0853 if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
0854 range_whole = true;
0855 dout(" not cyclic, %lu to %lu\n", index, end);
0856 }
0857 } else if (!ceph_wbc.head_snapc) {
0858
0859
0860
0861
0862 if (index > 0)
0863 should_loop = true;
0864 dout(" non-head snapc, range whole\n");
0865 }
0866
0867 ceph_put_snap_context(last_snapc);
0868 last_snapc = snapc;
0869
0870 while (!done && index <= end) {
0871 int num_ops = 0, op_idx;
0872 unsigned i, pvec_pages, max_pages, locked_pages = 0;
0873 struct page **pages = NULL, **data_pages;
0874 struct page *page;
0875 pgoff_t strip_unit_end = 0;
0876 u64 offset = 0, len = 0;
0877 bool from_pool = false;
0878
0879 max_pages = wsize >> PAGE_SHIFT;
0880
0881 get_more_pages:
0882 pvec_pages = pagevec_lookup_range_tag(&pvec, mapping, &index,
0883 end, PAGECACHE_TAG_DIRTY);
0884 dout("pagevec_lookup_range_tag got %d\n", pvec_pages);
0885 if (!pvec_pages && !locked_pages)
0886 break;
0887 for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
0888 page = pvec.pages[i];
0889 dout("? %p idx %lu\n", page, page->index);
0890 if (locked_pages == 0)
0891 lock_page(page);
0892 else if (!trylock_page(page))
0893 break;
0894
0895
0896 if (unlikely(!PageDirty(page)) ||
0897 unlikely(page->mapping != mapping)) {
0898 dout("!dirty or !mapping %p\n", page);
0899 unlock_page(page);
0900 continue;
0901 }
0902
0903 pgsnapc = page_snap_context(page);
0904 if (pgsnapc != snapc) {
0905 dout("page snapc %p %lld != oldest %p %lld\n",
0906 pgsnapc, pgsnapc->seq, snapc, snapc->seq);
0907 if (!should_loop &&
0908 !ceph_wbc.head_snapc &&
0909 wbc->sync_mode != WB_SYNC_NONE)
0910 should_loop = true;
0911 unlock_page(page);
0912 continue;
0913 }
0914 if (page_offset(page) >= ceph_wbc.i_size) {
0915 struct folio *folio = page_folio(page);
0916
0917 dout("folio at %lu beyond eof %llu\n",
0918 folio->index, ceph_wbc.i_size);
0919 if ((ceph_wbc.size_stable ||
0920 folio_pos(folio) >= i_size_read(inode)) &&
0921 folio_clear_dirty_for_io(folio))
0922 folio_invalidate(folio, 0,
0923 folio_size(folio));
0924 folio_unlock(folio);
0925 continue;
0926 }
0927 if (strip_unit_end && (page->index > strip_unit_end)) {
0928 dout("end of strip unit %p\n", page);
0929 unlock_page(page);
0930 break;
0931 }
0932 if (PageWriteback(page) || PageFsCache(page)) {
0933 if (wbc->sync_mode == WB_SYNC_NONE) {
0934 dout("%p under writeback\n", page);
0935 unlock_page(page);
0936 continue;
0937 }
0938 dout("waiting on writeback %p\n", page);
0939 wait_on_page_writeback(page);
0940 wait_on_page_fscache(page);
0941 }
0942
0943 if (!clear_page_dirty_for_io(page)) {
0944 dout("%p !clear_page_dirty_for_io\n", page);
0945 unlock_page(page);
0946 continue;
0947 }
0948
0949
0950
0951
0952
0953
0954
0955 if (locked_pages == 0) {
0956 u64 objnum;
0957 u64 objoff;
0958 u32 xlen;
0959
0960
0961 offset = (u64)page_offset(page);
0962 ceph_calc_file_object_mapping(&ci->i_layout,
0963 offset, wsize,
0964 &objnum, &objoff,
0965 &xlen);
0966 len = xlen;
0967
0968 num_ops = 1;
0969 strip_unit_end = page->index +
0970 ((len - 1) >> PAGE_SHIFT);
0971
0972 BUG_ON(pages);
0973 max_pages = calc_pages_for(0, (u64)len);
0974 pages = kmalloc_array(max_pages,
0975 sizeof(*pages),
0976 GFP_NOFS);
0977 if (!pages) {
0978 from_pool = true;
0979 pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
0980 BUG_ON(!pages);
0981 }
0982
0983 len = 0;
0984 } else if (page->index !=
0985 (offset + len) >> PAGE_SHIFT) {
0986 if (num_ops >= (from_pool ? CEPH_OSD_SLAB_OPS :
0987 CEPH_OSD_MAX_OPS)) {
0988 redirty_page_for_writepage(wbc, page);
0989 unlock_page(page);
0990 break;
0991 }
0992
0993 num_ops++;
0994 offset = (u64)page_offset(page);
0995 len = 0;
0996 }
0997
0998
0999 dout("%p will write page %p idx %lu\n",
1000 inode, page, page->index);
1001
1002 if (atomic_long_inc_return(&fsc->writeback_count) >
1003 CONGESTION_ON_THRESH(
1004 fsc->mount_options->congestion_kb))
1005 fsc->write_congested = true;
1006
1007 pages[locked_pages++] = page;
1008 pvec.pages[i] = NULL;
1009
1010 len += thp_size(page);
1011 }
1012
1013
1014 if (!locked_pages)
1015 goto release_pvec_pages;
1016 if (i) {
1017 unsigned j, n = 0;
1018
1019 for (j = 0; j < pvec_pages; j++) {
1020 if (!pvec.pages[j])
1021 continue;
1022 if (n < j)
1023 pvec.pages[n] = pvec.pages[j];
1024 n++;
1025 }
1026 pvec.nr = n;
1027
1028 if (pvec_pages && i == pvec_pages &&
1029 locked_pages < max_pages) {
1030 dout("reached end pvec, trying for more\n");
1031 pagevec_release(&pvec);
1032 goto get_more_pages;
1033 }
1034 }
1035
1036 new_request:
1037 offset = page_offset(pages[0]);
1038 len = wsize;
1039
1040 req = ceph_osdc_new_request(&fsc->client->osdc,
1041 &ci->i_layout, vino,
1042 offset, &len, 0, num_ops,
1043 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
1044 snapc, ceph_wbc.truncate_seq,
1045 ceph_wbc.truncate_size, false);
1046 if (IS_ERR(req)) {
1047 req = ceph_osdc_new_request(&fsc->client->osdc,
1048 &ci->i_layout, vino,
1049 offset, &len, 0,
1050 min(num_ops,
1051 CEPH_OSD_SLAB_OPS),
1052 CEPH_OSD_OP_WRITE,
1053 CEPH_OSD_FLAG_WRITE,
1054 snapc, ceph_wbc.truncate_seq,
1055 ceph_wbc.truncate_size, true);
1056 BUG_ON(IS_ERR(req));
1057 }
1058 BUG_ON(len < page_offset(pages[locked_pages - 1]) +
1059 thp_size(page) - offset);
1060
1061 req->r_callback = writepages_finish;
1062 req->r_inode = inode;
1063
1064
1065 len = 0;
1066 data_pages = pages;
1067 op_idx = 0;
1068 for (i = 0; i < locked_pages; i++) {
1069 u64 cur_offset = page_offset(pages[i]);
1070
1071
1072
1073
1074 if (offset + len != cur_offset) {
1075
1076 if (op_idx + 1 == req->r_num_ops)
1077 break;
1078
1079
1080 ceph_fscache_write_to_cache(inode, offset, len, caching);
1081
1082
1083 osd_req_op_extent_dup_last(req, op_idx,
1084 cur_offset - offset);
1085 dout("writepages got pages at %llu~%llu\n",
1086 offset, len);
1087 osd_req_op_extent_osd_data_pages(req, op_idx,
1088 data_pages, len, 0,
1089 from_pool, false);
1090 osd_req_op_extent_update(req, op_idx, len);
1091
1092 len = 0;
1093 offset = cur_offset;
1094 data_pages = pages + i;
1095 op_idx++;
1096 }
1097
1098 set_page_writeback(pages[i]);
1099 if (caching)
1100 ceph_set_page_fscache(pages[i]);
1101 len += thp_size(page);
1102 }
1103 ceph_fscache_write_to_cache(inode, offset, len, caching);
1104
1105 if (ceph_wbc.size_stable) {
1106 len = min(len, ceph_wbc.i_size - offset);
1107 } else if (i == locked_pages) {
1108
1109
1110
1111 u64 min_len = len + 1 - thp_size(page);
1112 len = get_writepages_data_length(inode, pages[i - 1],
1113 offset);
1114 len = max(len, min_len);
1115 }
1116 dout("writepages got pages at %llu~%llu\n", offset, len);
1117
1118 osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
1119 0, from_pool, false);
1120 osd_req_op_extent_update(req, op_idx, len);
1121
1122 BUG_ON(op_idx + 1 != req->r_num_ops);
1123
1124 from_pool = false;
1125 if (i < locked_pages) {
1126 BUG_ON(num_ops <= req->r_num_ops);
1127 num_ops -= req->r_num_ops;
1128 locked_pages -= i;
1129
1130
1131 data_pages = pages;
1132 pages = kmalloc_array(locked_pages, sizeof(*pages),
1133 GFP_NOFS);
1134 if (!pages) {
1135 from_pool = true;
1136 pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
1137 BUG_ON(!pages);
1138 }
1139 memcpy(pages, data_pages + i,
1140 locked_pages * sizeof(*pages));
1141 memset(data_pages + i, 0,
1142 locked_pages * sizeof(*pages));
1143 } else {
1144 BUG_ON(num_ops != req->r_num_ops);
1145 index = pages[i - 1]->index + 1;
1146
1147 pages = NULL;
1148 }
1149
1150 req->r_mtime = inode->i_mtime;
1151 ceph_osdc_start_request(&fsc->client->osdc, req);
1152 req = NULL;
1153
1154 wbc->nr_to_write -= i;
1155 if (pages)
1156 goto new_request;
1157
1158
1159
1160
1161
1162
1163
1164 if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE)
1165 done = true;
1166
1167 release_pvec_pages:
1168 dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
1169 pvec.nr ? pvec.pages[0] : NULL);
1170 pagevec_release(&pvec);
1171 }
1172
1173 if (should_loop && !done) {
1174
1175 dout("writepages looping back to beginning of file\n");
1176 end = start_index - 1;
1177
1178
1179
1180 if (wbc->sync_mode != WB_SYNC_NONE &&
1181 start_index == 0 &&
1182 !ceph_wbc.head_snapc) {
1183 struct page *page;
1184 unsigned i, nr;
1185 index = 0;
1186 while ((index <= end) &&
1187 (nr = pagevec_lookup_tag(&pvec, mapping, &index,
1188 PAGECACHE_TAG_WRITEBACK))) {
1189 for (i = 0; i < nr; i++) {
1190 page = pvec.pages[i];
1191 if (page_snap_context(page) != snapc)
1192 continue;
1193 wait_on_page_writeback(page);
1194 }
1195 pagevec_release(&pvec);
1196 cond_resched();
1197 }
1198 }
1199
1200 start_index = 0;
1201 index = 0;
1202 goto retry;
1203 }
1204
1205 if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0))
1206 mapping->writeback_index = index;
1207
1208 out:
1209 ceph_osdc_put_request(req);
1210 ceph_put_snap_context(last_snapc);
1211 dout("writepages dend - startone, rc = %d\n", rc);
1212 return rc;
1213 }
1214
1215
1216
1217
1218
1219
1220 static int context_is_writeable_or_written(struct inode *inode,
1221 struct ceph_snap_context *snapc)
1222 {
1223 struct ceph_snap_context *oldest = get_oldest_context(inode, NULL, NULL);
1224 int ret = !oldest || snapc->seq <= oldest->seq;
1225
1226 ceph_put_snap_context(oldest);
1227 return ret;
1228 }
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241 static struct ceph_snap_context *
1242 ceph_find_incompatible(struct page *page)
1243 {
1244 struct inode *inode = page->mapping->host;
1245 struct ceph_inode_info *ci = ceph_inode(inode);
1246
1247 if (ceph_inode_is_shutdown(inode)) {
1248 dout(" page %p %llx:%llx is shutdown\n", page,
1249 ceph_vinop(inode));
1250 return ERR_PTR(-ESTALE);
1251 }
1252
1253 for (;;) {
1254 struct ceph_snap_context *snapc, *oldest;
1255
1256 wait_on_page_writeback(page);
1257
1258 snapc = page_snap_context(page);
1259 if (!snapc || snapc == ci->i_head_snapc)
1260 break;
1261
1262
1263
1264
1265
1266 oldest = get_oldest_context(inode, NULL, NULL);
1267 if (snapc->seq > oldest->seq) {
1268
1269 ceph_put_snap_context(oldest);
1270 dout(" page %p snapc %p not current or oldest\n", page, snapc);
1271 return ceph_get_snap_context(snapc);
1272 }
1273 ceph_put_snap_context(oldest);
1274
1275
1276 dout(" page %p snapc %p not current, but oldest\n", page, snapc);
1277 if (clear_page_dirty_for_io(page)) {
1278 int r = writepage_nounlock(page, NULL);
1279 if (r < 0)
1280 return ERR_PTR(r);
1281 }
1282 }
1283 return NULL;
1284 }
1285
1286 static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len,
1287 struct folio **foliop, void **_fsdata)
1288 {
1289 struct inode *inode = file_inode(file);
1290 struct ceph_inode_info *ci = ceph_inode(inode);
1291 struct ceph_snap_context *snapc;
1292
1293 snapc = ceph_find_incompatible(folio_page(*foliop, 0));
1294 if (snapc) {
1295 int r;
1296
1297 folio_unlock(*foliop);
1298 folio_put(*foliop);
1299 *foliop = NULL;
1300 if (IS_ERR(snapc))
1301 return PTR_ERR(snapc);
1302
1303 ceph_queue_writeback(inode);
1304 r = wait_event_killable(ci->i_cap_wq,
1305 context_is_writeable_or_written(inode, snapc));
1306 ceph_put_snap_context(snapc);
1307 return r == 0 ? -EAGAIN : r;
1308 }
1309 return 0;
1310 }
1311
1312
1313
1314
1315
1316 static int ceph_write_begin(struct file *file, struct address_space *mapping,
1317 loff_t pos, unsigned len,
1318 struct page **pagep, void **fsdata)
1319 {
1320 struct inode *inode = file_inode(file);
1321 struct ceph_inode_info *ci = ceph_inode(inode);
1322 struct folio *folio = NULL;
1323 int r;
1324
1325 r = netfs_write_begin(&ci->netfs, file, inode->i_mapping, pos, len, &folio, NULL);
1326 if (r < 0)
1327 return r;
1328
1329 folio_wait_fscache(folio);
1330 WARN_ON_ONCE(!folio_test_locked(folio));
1331 *pagep = &folio->page;
1332 return 0;
1333 }
1334
1335
1336
1337
1338
1339 static int ceph_write_end(struct file *file, struct address_space *mapping,
1340 loff_t pos, unsigned len, unsigned copied,
1341 struct page *subpage, void *fsdata)
1342 {
1343 struct folio *folio = page_folio(subpage);
1344 struct inode *inode = file_inode(file);
1345 bool check_cap = false;
1346
1347 dout("write_end file %p inode %p folio %p %d~%d (%d)\n", file,
1348 inode, folio, (int)pos, (int)copied, (int)len);
1349
1350 if (!folio_test_uptodate(folio)) {
1351
1352 if (copied < len) {
1353 copied = 0;
1354 goto out;
1355 }
1356 folio_mark_uptodate(folio);
1357 }
1358
1359
1360 if (pos+copied > i_size_read(inode))
1361 check_cap = ceph_inode_set_size(inode, pos+copied);
1362
1363 folio_mark_dirty(folio);
1364
1365 out:
1366 folio_unlock(folio);
1367 folio_put(folio);
1368
1369 if (check_cap)
1370 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
1371
1372 return copied;
1373 }
1374
1375 const struct address_space_operations ceph_aops = {
1376 .read_folio = netfs_read_folio,
1377 .readahead = netfs_readahead,
1378 .writepage = ceph_writepage,
1379 .writepages = ceph_writepages_start,
1380 .write_begin = ceph_write_begin,
1381 .write_end = ceph_write_end,
1382 .dirty_folio = ceph_dirty_folio,
1383 .invalidate_folio = ceph_invalidate_folio,
1384 .release_folio = ceph_release_folio,
1385 .direct_IO = noop_direct_IO,
1386 };
1387
1388 static void ceph_block_sigs(sigset_t *oldset)
1389 {
1390 sigset_t mask;
1391 siginitsetinv(&mask, sigmask(SIGKILL));
1392 sigprocmask(SIG_BLOCK, &mask, oldset);
1393 }
1394
1395 static void ceph_restore_sigs(sigset_t *oldset)
1396 {
1397 sigprocmask(SIG_SETMASK, oldset, NULL);
1398 }
1399
1400
1401
1402
1403 static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
1404 {
1405 struct vm_area_struct *vma = vmf->vma;
1406 struct inode *inode = file_inode(vma->vm_file);
1407 struct ceph_inode_info *ci = ceph_inode(inode);
1408 struct ceph_file_info *fi = vma->vm_file->private_data;
1409 loff_t off = (loff_t)vmf->pgoff << PAGE_SHIFT;
1410 int want, got, err;
1411 sigset_t oldset;
1412 vm_fault_t ret = VM_FAULT_SIGBUS;
1413
1414 if (ceph_inode_is_shutdown(inode))
1415 return ret;
1416
1417 ceph_block_sigs(&oldset);
1418
1419 dout("filemap_fault %p %llx.%llx %llu trying to get caps\n",
1420 inode, ceph_vinop(inode), off);
1421 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1422 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
1423 else
1424 want = CEPH_CAP_FILE_CACHE;
1425
1426 got = 0;
1427 err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_RD, want, -1, &got);
1428 if (err < 0)
1429 goto out_restore;
1430
1431 dout("filemap_fault %p %llu got cap refs on %s\n",
1432 inode, off, ceph_cap_string(got));
1433
1434 if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
1435 !ceph_has_inline_data(ci)) {
1436 CEPH_DEFINE_RW_CONTEXT(rw_ctx, got);
1437 ceph_add_rw_context(fi, &rw_ctx);
1438 ret = filemap_fault(vmf);
1439 ceph_del_rw_context(fi, &rw_ctx);
1440 dout("filemap_fault %p %llu drop cap refs %s ret %x\n",
1441 inode, off, ceph_cap_string(got), ret);
1442 } else
1443 err = -EAGAIN;
1444
1445 ceph_put_cap_refs(ci, got);
1446
1447 if (err != -EAGAIN)
1448 goto out_restore;
1449
1450
1451 if (off >= PAGE_SIZE) {
1452
1453 ret = VM_FAULT_SIGBUS;
1454 } else {
1455 struct address_space *mapping = inode->i_mapping;
1456 struct page *page;
1457
1458 filemap_invalidate_lock_shared(mapping);
1459 page = find_or_create_page(mapping, 0,
1460 mapping_gfp_constraint(mapping, ~__GFP_FS));
1461 if (!page) {
1462 ret = VM_FAULT_OOM;
1463 goto out_inline;
1464 }
1465 err = __ceph_do_getattr(inode, page,
1466 CEPH_STAT_CAP_INLINE_DATA, true);
1467 if (err < 0 || off >= i_size_read(inode)) {
1468 unlock_page(page);
1469 put_page(page);
1470 ret = vmf_error(err);
1471 goto out_inline;
1472 }
1473 if (err < PAGE_SIZE)
1474 zero_user_segment(page, err, PAGE_SIZE);
1475 else
1476 flush_dcache_page(page);
1477 SetPageUptodate(page);
1478 vmf->page = page;
1479 ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
1480 out_inline:
1481 filemap_invalidate_unlock_shared(mapping);
1482 dout("filemap_fault %p %llu read inline data ret %x\n",
1483 inode, off, ret);
1484 }
1485 out_restore:
1486 ceph_restore_sigs(&oldset);
1487 if (err < 0)
1488 ret = vmf_error(err);
1489
1490 return ret;
1491 }
1492
1493 static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
1494 {
1495 struct vm_area_struct *vma = vmf->vma;
1496 struct inode *inode = file_inode(vma->vm_file);
1497 struct ceph_inode_info *ci = ceph_inode(inode);
1498 struct ceph_file_info *fi = vma->vm_file->private_data;
1499 struct ceph_cap_flush *prealloc_cf;
1500 struct page *page = vmf->page;
1501 loff_t off = page_offset(page);
1502 loff_t size = i_size_read(inode);
1503 size_t len;
1504 int want, got, err;
1505 sigset_t oldset;
1506 vm_fault_t ret = VM_FAULT_SIGBUS;
1507
1508 if (ceph_inode_is_shutdown(inode))
1509 return ret;
1510
1511 prealloc_cf = ceph_alloc_cap_flush();
1512 if (!prealloc_cf)
1513 return VM_FAULT_OOM;
1514
1515 sb_start_pagefault(inode->i_sb);
1516 ceph_block_sigs(&oldset);
1517
1518 if (off + thp_size(page) <= size)
1519 len = thp_size(page);
1520 else
1521 len = offset_in_thp(page, size);
1522
1523 dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n",
1524 inode, ceph_vinop(inode), off, len, size);
1525 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1526 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1527 else
1528 want = CEPH_CAP_FILE_BUFFER;
1529
1530 got = 0;
1531 err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_WR, want, off + len, &got);
1532 if (err < 0)
1533 goto out_free;
1534
1535 dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
1536 inode, off, len, ceph_cap_string(got));
1537
1538
1539 file_update_time(vma->vm_file);
1540 inode_inc_iversion_raw(inode);
1541
1542 do {
1543 struct ceph_snap_context *snapc;
1544
1545 lock_page(page);
1546
1547 if (page_mkwrite_check_truncate(page, inode) < 0) {
1548 unlock_page(page);
1549 ret = VM_FAULT_NOPAGE;
1550 break;
1551 }
1552
1553 snapc = ceph_find_incompatible(page);
1554 if (!snapc) {
1555
1556 set_page_dirty(page);
1557 ret = VM_FAULT_LOCKED;
1558 break;
1559 }
1560
1561 unlock_page(page);
1562
1563 if (IS_ERR(snapc)) {
1564 ret = VM_FAULT_SIGBUS;
1565 break;
1566 }
1567
1568 ceph_queue_writeback(inode);
1569 err = wait_event_killable(ci->i_cap_wq,
1570 context_is_writeable_or_written(inode, snapc));
1571 ceph_put_snap_context(snapc);
1572 } while (err == 0);
1573
1574 if (ret == VM_FAULT_LOCKED) {
1575 int dirty;
1576 spin_lock(&ci->i_ceph_lock);
1577 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
1578 &prealloc_cf);
1579 spin_unlock(&ci->i_ceph_lock);
1580 if (dirty)
1581 __mark_inode_dirty(inode, dirty);
1582 }
1583
1584 dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %x\n",
1585 inode, off, len, ceph_cap_string(got), ret);
1586 ceph_put_cap_refs_async(ci, got);
1587 out_free:
1588 ceph_restore_sigs(&oldset);
1589 sb_end_pagefault(inode->i_sb);
1590 ceph_free_cap_flush(prealloc_cf);
1591 if (err < 0)
1592 ret = vmf_error(err);
1593 return ret;
1594 }
1595
1596 void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
1597 char *data, size_t len)
1598 {
1599 struct address_space *mapping = inode->i_mapping;
1600 struct page *page;
1601
1602 if (locked_page) {
1603 page = locked_page;
1604 } else {
1605 if (i_size_read(inode) == 0)
1606 return;
1607 page = find_or_create_page(mapping, 0,
1608 mapping_gfp_constraint(mapping,
1609 ~__GFP_FS));
1610 if (!page)
1611 return;
1612 if (PageUptodate(page)) {
1613 unlock_page(page);
1614 put_page(page);
1615 return;
1616 }
1617 }
1618
1619 dout("fill_inline_data %p %llx.%llx len %zu locked_page %p\n",
1620 inode, ceph_vinop(inode), len, locked_page);
1621
1622 if (len > 0) {
1623 void *kaddr = kmap_atomic(page);
1624 memcpy(kaddr, data, len);
1625 kunmap_atomic(kaddr);
1626 }
1627
1628 if (page != locked_page) {
1629 if (len < PAGE_SIZE)
1630 zero_user_segment(page, len, PAGE_SIZE);
1631 else
1632 flush_dcache_page(page);
1633
1634 SetPageUptodate(page);
1635 unlock_page(page);
1636 put_page(page);
1637 }
1638 }
1639
1640 int ceph_uninline_data(struct file *file)
1641 {
1642 struct inode *inode = file_inode(file);
1643 struct ceph_inode_info *ci = ceph_inode(inode);
1644 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1645 struct ceph_osd_request *req = NULL;
1646 struct ceph_cap_flush *prealloc_cf;
1647 struct folio *folio = NULL;
1648 u64 inline_version = CEPH_INLINE_NONE;
1649 struct page *pages[1];
1650 int err = 0;
1651 u64 len;
1652
1653 spin_lock(&ci->i_ceph_lock);
1654 inline_version = ci->i_inline_version;
1655 spin_unlock(&ci->i_ceph_lock);
1656
1657 dout("uninline_data %p %llx.%llx inline_version %llu\n",
1658 inode, ceph_vinop(inode), inline_version);
1659
1660 if (inline_version == CEPH_INLINE_NONE)
1661 return 0;
1662
1663 prealloc_cf = ceph_alloc_cap_flush();
1664 if (!prealloc_cf)
1665 return -ENOMEM;
1666
1667 if (inline_version == 1)
1668 goto out_uninline;
1669
1670 folio = read_mapping_folio(inode->i_mapping, 0, file);
1671 if (IS_ERR(folio)) {
1672 err = PTR_ERR(folio);
1673 goto out;
1674 }
1675
1676 folio_lock(folio);
1677
1678 len = i_size_read(inode);
1679 if (len > folio_size(folio))
1680 len = folio_size(folio);
1681
1682 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1683 ceph_vino(inode), 0, &len, 0, 1,
1684 CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE,
1685 NULL, 0, 0, false);
1686 if (IS_ERR(req)) {
1687 err = PTR_ERR(req);
1688 goto out_unlock;
1689 }
1690
1691 req->r_mtime = inode->i_mtime;
1692 ceph_osdc_start_request(&fsc->client->osdc, req);
1693 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
1694 ceph_osdc_put_request(req);
1695 if (err < 0)
1696 goto out_unlock;
1697
1698 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1699 ceph_vino(inode), 0, &len, 1, 3,
1700 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
1701 NULL, ci->i_truncate_seq,
1702 ci->i_truncate_size, false);
1703 if (IS_ERR(req)) {
1704 err = PTR_ERR(req);
1705 goto out_unlock;
1706 }
1707
1708 pages[0] = folio_page(folio, 0);
1709 osd_req_op_extent_osd_data_pages(req, 1, pages, len, 0, false, false);
1710
1711 {
1712 __le64 xattr_buf = cpu_to_le64(inline_version);
1713 err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
1714 "inline_version", &xattr_buf,
1715 sizeof(xattr_buf),
1716 CEPH_OSD_CMPXATTR_OP_GT,
1717 CEPH_OSD_CMPXATTR_MODE_U64);
1718 if (err)
1719 goto out_put_req;
1720 }
1721
1722 {
1723 char xattr_buf[32];
1724 int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf),
1725 "%llu", inline_version);
1726 err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
1727 "inline_version",
1728 xattr_buf, xattr_len, 0, 0);
1729 if (err)
1730 goto out_put_req;
1731 }
1732
1733 req->r_mtime = inode->i_mtime;
1734 ceph_osdc_start_request(&fsc->client->osdc, req);
1735 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
1736
1737 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
1738 req->r_end_latency, len, err);
1739
1740 out_uninline:
1741 if (!err) {
1742 int dirty;
1743
1744
1745 down_read(&fsc->mdsc->snap_rwsem);
1746 spin_lock(&ci->i_ceph_lock);
1747 ci->i_inline_version = CEPH_INLINE_NONE;
1748 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, &prealloc_cf);
1749 spin_unlock(&ci->i_ceph_lock);
1750 up_read(&fsc->mdsc->snap_rwsem);
1751 if (dirty)
1752 __mark_inode_dirty(inode, dirty);
1753 }
1754 out_put_req:
1755 ceph_osdc_put_request(req);
1756 if (err == -ECANCELED)
1757 err = 0;
1758 out_unlock:
1759 if (folio) {
1760 folio_unlock(folio);
1761 folio_put(folio);
1762 }
1763 out:
1764 ceph_free_cap_flush(prealloc_cf);
1765 dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
1766 inode, ceph_vinop(inode), inline_version, err);
1767 return err;
1768 }
1769
1770 static const struct vm_operations_struct ceph_vmops = {
1771 .fault = ceph_filemap_fault,
1772 .page_mkwrite = ceph_page_mkwrite,
1773 };
1774
1775 int ceph_mmap(struct file *file, struct vm_area_struct *vma)
1776 {
1777 struct address_space *mapping = file->f_mapping;
1778
1779 if (!mapping->a_ops->read_folio)
1780 return -ENOEXEC;
1781 vma->vm_ops = &ceph_vmops;
1782 return 0;
1783 }
1784
1785 enum {
1786 POOL_READ = 1,
1787 POOL_WRITE = 2,
1788 };
1789
1790 static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
1791 s64 pool, struct ceph_string *pool_ns)
1792 {
1793 struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->netfs.inode);
1794 struct ceph_mds_client *mdsc = fsc->mdsc;
1795 struct ceph_osd_request *rd_req = NULL, *wr_req = NULL;
1796 struct rb_node **p, *parent;
1797 struct ceph_pool_perm *perm;
1798 struct page **pages;
1799 size_t pool_ns_len;
1800 int err = 0, err2 = 0, have = 0;
1801
1802 down_read(&mdsc->pool_perm_rwsem);
1803 p = &mdsc->pool_perm_tree.rb_node;
1804 while (*p) {
1805 perm = rb_entry(*p, struct ceph_pool_perm, node);
1806 if (pool < perm->pool)
1807 p = &(*p)->rb_left;
1808 else if (pool > perm->pool)
1809 p = &(*p)->rb_right;
1810 else {
1811 int ret = ceph_compare_string(pool_ns,
1812 perm->pool_ns,
1813 perm->pool_ns_len);
1814 if (ret < 0)
1815 p = &(*p)->rb_left;
1816 else if (ret > 0)
1817 p = &(*p)->rb_right;
1818 else {
1819 have = perm->perm;
1820 break;
1821 }
1822 }
1823 }
1824 up_read(&mdsc->pool_perm_rwsem);
1825 if (*p)
1826 goto out;
1827
1828 if (pool_ns)
1829 dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n",
1830 pool, (int)pool_ns->len, pool_ns->str);
1831 else
1832 dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);
1833
1834 down_write(&mdsc->pool_perm_rwsem);
1835 p = &mdsc->pool_perm_tree.rb_node;
1836 parent = NULL;
1837 while (*p) {
1838 parent = *p;
1839 perm = rb_entry(parent, struct ceph_pool_perm, node);
1840 if (pool < perm->pool)
1841 p = &(*p)->rb_left;
1842 else if (pool > perm->pool)
1843 p = &(*p)->rb_right;
1844 else {
1845 int ret = ceph_compare_string(pool_ns,
1846 perm->pool_ns,
1847 perm->pool_ns_len);
1848 if (ret < 0)
1849 p = &(*p)->rb_left;
1850 else if (ret > 0)
1851 p = &(*p)->rb_right;
1852 else {
1853 have = perm->perm;
1854 break;
1855 }
1856 }
1857 }
1858 if (*p) {
1859 up_write(&mdsc->pool_perm_rwsem);
1860 goto out;
1861 }
1862
1863 rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1864 1, false, GFP_NOFS);
1865 if (!rd_req) {
1866 err = -ENOMEM;
1867 goto out_unlock;
1868 }
1869
1870 rd_req->r_flags = CEPH_OSD_FLAG_READ;
1871 osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
1872 rd_req->r_base_oloc.pool = pool;
1873 if (pool_ns)
1874 rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns);
1875 ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino);
1876
1877 err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
1878 if (err)
1879 goto out_unlock;
1880
1881 wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1882 1, false, GFP_NOFS);
1883 if (!wr_req) {
1884 err = -ENOMEM;
1885 goto out_unlock;
1886 }
1887
1888 wr_req->r_flags = CEPH_OSD_FLAG_WRITE;
1889 osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
1890 ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc);
1891 ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid);
1892
1893 err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS);
1894 if (err)
1895 goto out_unlock;
1896
1897
1898 pages = ceph_alloc_page_vector(1, GFP_KERNEL);
1899 if (IS_ERR(pages)) {
1900 err = PTR_ERR(pages);
1901 goto out_unlock;
1902 }
1903
1904 osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
1905 0, false, true);
1906 ceph_osdc_start_request(&fsc->client->osdc, rd_req);
1907
1908 wr_req->r_mtime = ci->netfs.inode.i_mtime;
1909 ceph_osdc_start_request(&fsc->client->osdc, wr_req);
1910
1911 err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req);
1912 err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req);
1913
1914 if (err >= 0 || err == -ENOENT)
1915 have |= POOL_READ;
1916 else if (err != -EPERM) {
1917 if (err == -EBLOCKLISTED)
1918 fsc->blocklisted = true;
1919 goto out_unlock;
1920 }
1921
1922 if (err2 == 0 || err2 == -EEXIST)
1923 have |= POOL_WRITE;
1924 else if (err2 != -EPERM) {
1925 if (err2 == -EBLOCKLISTED)
1926 fsc->blocklisted = true;
1927 err = err2;
1928 goto out_unlock;
1929 }
1930
1931 pool_ns_len = pool_ns ? pool_ns->len : 0;
1932 perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS);
1933 if (!perm) {
1934 err = -ENOMEM;
1935 goto out_unlock;
1936 }
1937
1938 perm->pool = pool;
1939 perm->perm = have;
1940 perm->pool_ns_len = pool_ns_len;
1941 if (pool_ns_len > 0)
1942 memcpy(perm->pool_ns, pool_ns->str, pool_ns_len);
1943 perm->pool_ns[pool_ns_len] = 0;
1944
1945 rb_link_node(&perm->node, parent, p);
1946 rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
1947 err = 0;
1948 out_unlock:
1949 up_write(&mdsc->pool_perm_rwsem);
1950
1951 ceph_osdc_put_request(rd_req);
1952 ceph_osdc_put_request(wr_req);
1953 out:
1954 if (!err)
1955 err = have;
1956 if (pool_ns)
1957 dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n",
1958 pool, (int)pool_ns->len, pool_ns->str, err);
1959 else
1960 dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
1961 return err;
1962 }
1963
1964 int ceph_pool_perm_check(struct inode *inode, int need)
1965 {
1966 struct ceph_inode_info *ci = ceph_inode(inode);
1967 struct ceph_string *pool_ns;
1968 s64 pool;
1969 int ret, flags;
1970
1971
1972 if (!S_ISREG(inode->i_mode))
1973 return 0;
1974
1975 if (ci->i_vino.snap != CEPH_NOSNAP) {
1976
1977
1978
1979
1980
1981 return 0;
1982 }
1983
1984 if (ceph_test_mount_opt(ceph_inode_to_client(inode),
1985 NOPOOLPERM))
1986 return 0;
1987
1988 spin_lock(&ci->i_ceph_lock);
1989 flags = ci->i_ceph_flags;
1990 pool = ci->i_layout.pool_id;
1991 spin_unlock(&ci->i_ceph_lock);
1992 check:
1993 if (flags & CEPH_I_POOL_PERM) {
1994 if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) {
1995 dout("ceph_pool_perm_check pool %lld no read perm\n",
1996 pool);
1997 return -EPERM;
1998 }
1999 if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) {
2000 dout("ceph_pool_perm_check pool %lld no write perm\n",
2001 pool);
2002 return -EPERM;
2003 }
2004 return 0;
2005 }
2006
2007 pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
2008 ret = __ceph_pool_perm_get(ci, pool, pool_ns);
2009 ceph_put_string(pool_ns);
2010 if (ret < 0)
2011 return ret;
2012
2013 flags = CEPH_I_POOL_PERM;
2014 if (ret & POOL_READ)
2015 flags |= CEPH_I_POOL_RD;
2016 if (ret & POOL_WRITE)
2017 flags |= CEPH_I_POOL_WR;
2018
2019 spin_lock(&ci->i_ceph_lock);
2020 if (pool == ci->i_layout.pool_id &&
2021 pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) {
2022 ci->i_ceph_flags |= flags;
2023 } else {
2024 pool = ci->i_layout.pool_id;
2025 flags = ci->i_ceph_flags;
2026 }
2027 spin_unlock(&ci->i_ceph_lock);
2028 goto check;
2029 }
2030
2031 void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc)
2032 {
2033 struct ceph_pool_perm *perm;
2034 struct rb_node *n;
2035
2036 while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) {
2037 n = rb_first(&mdsc->pool_perm_tree);
2038 perm = rb_entry(n, struct ceph_pool_perm, node);
2039 rb_erase(n, &mdsc->pool_perm_tree);
2040 kfree(perm);
2041 }
2042 }