Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-only
0002 /*
0003  * linux/fs/nfs/direct.c
0004  *
0005  * Copyright (C) 2003 by Chuck Lever <cel@netapp.com>
0006  *
0007  * High-performance uncached I/O for the Linux NFS client
0008  *
0009  * There are important applications whose performance or correctness
0010  * depends on uncached access to file data.  Database clusters
0011  * (multiple copies of the same instance running on separate hosts)
0012  * implement their own cache coherency protocol that subsumes file
0013  * system cache protocols.  Applications that process datasets
0014  * considerably larger than the client's memory do not always benefit
0015  * from a local cache.  A streaming video server, for instance, has no
0016  * need to cache the contents of a file.
0017  *
0018  * When an application requests uncached I/O, all read and write requests
0019  * are made directly to the server; data stored or fetched via these
0020  * requests is not cached in the Linux page cache.  The client does not
0021  * correct unaligned requests from applications.  All requested bytes are
0022  * held on permanent storage before a direct write system call returns to
0023  * an application.
0024  *
0025  * Solaris implements an uncached I/O facility called directio() that
0026  * is used for backups and sequential I/O to very large files.  Solaris
0027  * also supports uncaching whole NFS partitions with "-o forcedirectio,"
0028  * an undocumented mount option.
0029  *
0030  * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with
0031  * help from Andrew Morton.
0032  *
0033  * 18 Dec 2001  Initial implementation for 2.4  --cel
0034  * 08 Jul 2002  Version for 2.4.19, with bug fixes --trondmy
0035  * 08 Jun 2003  Port to 2.5 APIs  --cel
0036  * 31 Mar 2004  Handle direct I/O without VFS support  --cel
0037  * 15 Sep 2004  Parallel async reads  --cel
0038  * 04 May 2005  support O_DIRECT with aio  --cel
0039  *
0040  */
0041 
0042 #include <linux/errno.h>
0043 #include <linux/sched.h>
0044 #include <linux/kernel.h>
0045 #include <linux/file.h>
0046 #include <linux/pagemap.h>
0047 #include <linux/kref.h>
0048 #include <linux/slab.h>
0049 #include <linux/task_io_accounting_ops.h>
0050 #include <linux/module.h>
0051 
0052 #include <linux/nfs_fs.h>
0053 #include <linux/nfs_page.h>
0054 #include <linux/sunrpc/clnt.h>
0055 
0056 #include <linux/uaccess.h>
0057 #include <linux/atomic.h>
0058 
0059 #include "internal.h"
0060 #include "iostat.h"
0061 #include "pnfs.h"
0062 #include "fscache.h"
0063 #include "nfstrace.h"
0064 
0065 #define NFSDBG_FACILITY     NFSDBG_VFS
0066 
0067 static struct kmem_cache *nfs_direct_cachep;
0068 
0069 static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops;
0070 static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops;
0071 static void nfs_direct_write_complete(struct nfs_direct_req *dreq);
0072 static void nfs_direct_write_schedule_work(struct work_struct *work);
0073 
0074 static inline void get_dreq(struct nfs_direct_req *dreq)
0075 {
0076     atomic_inc(&dreq->io_count);
0077 }
0078 
0079 static inline int put_dreq(struct nfs_direct_req *dreq)
0080 {
0081     return atomic_dec_and_test(&dreq->io_count);
0082 }
0083 
0084 static void
0085 nfs_direct_handle_truncated(struct nfs_direct_req *dreq,
0086                 const struct nfs_pgio_header *hdr,
0087                 ssize_t dreq_len)
0088 {
0089     if (!(test_bit(NFS_IOHDR_ERROR, &hdr->flags) ||
0090           test_bit(NFS_IOHDR_EOF, &hdr->flags)))
0091         return;
0092     if (dreq->max_count >= dreq_len) {
0093         dreq->max_count = dreq_len;
0094         if (dreq->count > dreq_len)
0095             dreq->count = dreq_len;
0096 
0097         if (test_bit(NFS_IOHDR_ERROR, &hdr->flags))
0098             dreq->error = hdr->error;
0099         else /* Clear outstanding error if this is EOF */
0100             dreq->error = 0;
0101     }
0102 }
0103 
0104 static void
0105 nfs_direct_count_bytes(struct nfs_direct_req *dreq,
0106                const struct nfs_pgio_header *hdr)
0107 {
0108     loff_t hdr_end = hdr->io_start + hdr->good_bytes;
0109     ssize_t dreq_len = 0;
0110 
0111     if (hdr_end > dreq->io_start)
0112         dreq_len = hdr_end - dreq->io_start;
0113 
0114     nfs_direct_handle_truncated(dreq, hdr, dreq_len);
0115 
0116     if (dreq_len > dreq->max_count)
0117         dreq_len = dreq->max_count;
0118 
0119     if (dreq->count < dreq_len)
0120         dreq->count = dreq_len;
0121 }
0122 
0123 /**
0124  * nfs_swap_rw - NFS address space operation for swap I/O
0125  * @iocb: target I/O control block
0126  * @iter: I/O buffer
0127  *
0128  * Perform IO to the swap-file.  This is much like direct IO.
0129  */
0130 int nfs_swap_rw(struct kiocb *iocb, struct iov_iter *iter)
0131 {
0132     ssize_t ret;
0133 
0134     VM_BUG_ON(iov_iter_count(iter) != PAGE_SIZE);
0135 
0136     if (iov_iter_rw(iter) == READ)
0137         ret = nfs_file_direct_read(iocb, iter, true);
0138     else
0139         ret = nfs_file_direct_write(iocb, iter, true);
0140     if (ret < 0)
0141         return ret;
0142     return 0;
0143 }
0144 
0145 static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
0146 {
0147     unsigned int i;
0148     for (i = 0; i < npages; i++)
0149         put_page(pages[i]);
0150 }
0151 
0152 void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
0153                   struct nfs_direct_req *dreq)
0154 {
0155     cinfo->inode = dreq->inode;
0156     cinfo->mds = &dreq->mds_cinfo;
0157     cinfo->ds = &dreq->ds_cinfo;
0158     cinfo->dreq = dreq;
0159     cinfo->completion_ops = &nfs_direct_commit_completion_ops;
0160 }
0161 
0162 static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
0163 {
0164     struct nfs_direct_req *dreq;
0165 
0166     dreq = kmem_cache_zalloc(nfs_direct_cachep, GFP_KERNEL);
0167     if (!dreq)
0168         return NULL;
0169 
0170     kref_init(&dreq->kref);
0171     kref_get(&dreq->kref);
0172     init_completion(&dreq->completion);
0173     INIT_LIST_HEAD(&dreq->mds_cinfo.list);
0174     pnfs_init_ds_commit_info(&dreq->ds_cinfo);
0175     INIT_WORK(&dreq->work, nfs_direct_write_schedule_work);
0176     spin_lock_init(&dreq->lock);
0177 
0178     return dreq;
0179 }
0180 
0181 static void nfs_direct_req_free(struct kref *kref)
0182 {
0183     struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
0184 
0185     pnfs_release_ds_info(&dreq->ds_cinfo, dreq->inode);
0186     if (dreq->l_ctx != NULL)
0187         nfs_put_lock_context(dreq->l_ctx);
0188     if (dreq->ctx != NULL)
0189         put_nfs_open_context(dreq->ctx);
0190     kmem_cache_free(nfs_direct_cachep, dreq);
0191 }
0192 
0193 static void nfs_direct_req_release(struct nfs_direct_req *dreq)
0194 {
0195     kref_put(&dreq->kref, nfs_direct_req_free);
0196 }
0197 
0198 ssize_t nfs_dreq_bytes_left(struct nfs_direct_req *dreq)
0199 {
0200     return dreq->bytes_left;
0201 }
0202 EXPORT_SYMBOL_GPL(nfs_dreq_bytes_left);
0203 
0204 /*
0205  * Collects and returns the final error value/byte-count.
0206  */
0207 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
0208 {
0209     ssize_t result = -EIOCBQUEUED;
0210 
0211     /* Async requests don't wait here */
0212     if (dreq->iocb)
0213         goto out;
0214 
0215     result = wait_for_completion_killable(&dreq->completion);
0216 
0217     if (!result) {
0218         result = dreq->count;
0219         WARN_ON_ONCE(dreq->count < 0);
0220     }
0221     if (!result)
0222         result = dreq->error;
0223 
0224 out:
0225     return (ssize_t) result;
0226 }
0227 
0228 /*
0229  * Synchronous I/O uses a stack-allocated iocb.  Thus we can't trust
0230  * the iocb is still valid here if this is a synchronous request.
0231  */
0232 static void nfs_direct_complete(struct nfs_direct_req *dreq)
0233 {
0234     struct inode *inode = dreq->inode;
0235 
0236     inode_dio_end(inode);
0237 
0238     if (dreq->iocb) {
0239         long res = (long) dreq->error;
0240         if (dreq->count != 0) {
0241             res = (long) dreq->count;
0242             WARN_ON_ONCE(dreq->count < 0);
0243         }
0244         dreq->iocb->ki_complete(dreq->iocb, res);
0245     }
0246 
0247     complete(&dreq->completion);
0248 
0249     nfs_direct_req_release(dreq);
0250 }
0251 
0252 static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
0253 {
0254     unsigned long bytes = 0;
0255     struct nfs_direct_req *dreq = hdr->dreq;
0256 
0257     spin_lock(&dreq->lock);
0258     if (test_bit(NFS_IOHDR_REDO, &hdr->flags)) {
0259         spin_unlock(&dreq->lock);
0260         goto out_put;
0261     }
0262 
0263     nfs_direct_count_bytes(dreq, hdr);
0264     spin_unlock(&dreq->lock);
0265 
0266     while (!list_empty(&hdr->pages)) {
0267         struct nfs_page *req = nfs_list_entry(hdr->pages.next);
0268         struct page *page = req->wb_page;
0269 
0270         if (!PageCompound(page) && bytes < hdr->good_bytes &&
0271             (dreq->flags == NFS_ODIRECT_SHOULD_DIRTY))
0272             set_page_dirty(page);
0273         bytes += req->wb_bytes;
0274         nfs_list_remove_request(req);
0275         nfs_release_request(req);
0276     }
0277 out_put:
0278     if (put_dreq(dreq))
0279         nfs_direct_complete(dreq);
0280     hdr->release(hdr);
0281 }
0282 
0283 static void nfs_read_sync_pgio_error(struct list_head *head, int error)
0284 {
0285     struct nfs_page *req;
0286 
0287     while (!list_empty(head)) {
0288         req = nfs_list_entry(head->next);
0289         nfs_list_remove_request(req);
0290         nfs_release_request(req);
0291     }
0292 }
0293 
0294 static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
0295 {
0296     get_dreq(hdr->dreq);
0297 }
0298 
0299 static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
0300     .error_cleanup = nfs_read_sync_pgio_error,
0301     .init_hdr = nfs_direct_pgio_init,
0302     .completion = nfs_direct_read_completion,
0303 };
0304 
0305 /*
0306  * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
0307  * operation.  If nfs_readdata_alloc() or get_user_pages() fails,
0308  * bail and stop sending more reads.  Read length accounting is
0309  * handled automatically by nfs_direct_read_result().  Otherwise, if
0310  * no requests have been sent, just return an error.
0311  */
0312 
0313 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
0314                           struct iov_iter *iter,
0315                           loff_t pos)
0316 {
0317     struct nfs_pageio_descriptor desc;
0318     struct inode *inode = dreq->inode;
0319     ssize_t result = -EINVAL;
0320     size_t requested_bytes = 0;
0321     size_t rsize = max_t(size_t, NFS_SERVER(inode)->rsize, PAGE_SIZE);
0322 
0323     nfs_pageio_init_read(&desc, dreq->inode, false,
0324                  &nfs_direct_read_completion_ops);
0325     get_dreq(dreq);
0326     desc.pg_dreq = dreq;
0327     inode_dio_begin(inode);
0328 
0329     while (iov_iter_count(iter)) {
0330         struct page **pagevec;
0331         size_t bytes;
0332         size_t pgbase;
0333         unsigned npages, i;
0334 
0335         result = iov_iter_get_pages_alloc2(iter, &pagevec,
0336                           rsize, &pgbase);
0337         if (result < 0)
0338             break;
0339     
0340         bytes = result;
0341         npages = (result + pgbase + PAGE_SIZE - 1) / PAGE_SIZE;
0342         for (i = 0; i < npages; i++) {
0343             struct nfs_page *req;
0344             unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
0345             /* XXX do we need to do the eof zeroing found in async_filler? */
0346             req = nfs_create_request(dreq->ctx, pagevec[i],
0347                          pgbase, req_len);
0348             if (IS_ERR(req)) {
0349                 result = PTR_ERR(req);
0350                 break;
0351             }
0352             req->wb_index = pos >> PAGE_SHIFT;
0353             req->wb_offset = pos & ~PAGE_MASK;
0354             if (!nfs_pageio_add_request(&desc, req)) {
0355                 result = desc.pg_error;
0356                 nfs_release_request(req);
0357                 break;
0358             }
0359             pgbase = 0;
0360             bytes -= req_len;
0361             requested_bytes += req_len;
0362             pos += req_len;
0363             dreq->bytes_left -= req_len;
0364         }
0365         nfs_direct_release_pages(pagevec, npages);
0366         kvfree(pagevec);
0367         if (result < 0)
0368             break;
0369     }
0370 
0371     nfs_pageio_complete(&desc);
0372 
0373     /*
0374      * If no bytes were started, return the error, and let the
0375      * generic layer handle the completion.
0376      */
0377     if (requested_bytes == 0) {
0378         inode_dio_end(inode);
0379         nfs_direct_req_release(dreq);
0380         return result < 0 ? result : -EIO;
0381     }
0382 
0383     if (put_dreq(dreq))
0384         nfs_direct_complete(dreq);
0385     return requested_bytes;
0386 }
0387 
0388 /**
0389  * nfs_file_direct_read - file direct read operation for NFS files
0390  * @iocb: target I/O control block
0391  * @iter: vector of user buffers into which to read data
0392  * @swap: flag indicating this is swap IO, not O_DIRECT IO
0393  *
0394  * We use this function for direct reads instead of calling
0395  * generic_file_aio_read() in order to avoid gfar's check to see if
0396  * the request starts before the end of the file.  For that check
0397  * to work, we must generate a GETATTR before each direct read, and
0398  * even then there is a window between the GETATTR and the subsequent
0399  * READ where the file size could change.  Our preference is simply
0400  * to do all reads the application wants, and the server will take
0401  * care of managing the end of file boundary.
0402  *
0403  * This function also eliminates unnecessarily updating the file's
0404  * atime locally, as the NFS server sets the file's atime, and this
0405  * client must read the updated atime from the server back into its
0406  * cache.
0407  */
0408 ssize_t nfs_file_direct_read(struct kiocb *iocb, struct iov_iter *iter,
0409                  bool swap)
0410 {
0411     struct file *file = iocb->ki_filp;
0412     struct address_space *mapping = file->f_mapping;
0413     struct inode *inode = mapping->host;
0414     struct nfs_direct_req *dreq;
0415     struct nfs_lock_context *l_ctx;
0416     ssize_t result, requested;
0417     size_t count = iov_iter_count(iter);
0418     nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count);
0419 
0420     dfprintk(FILE, "NFS: direct read(%pD2, %zd@%Ld)\n",
0421         file, count, (long long) iocb->ki_pos);
0422 
0423     result = 0;
0424     if (!count)
0425         goto out;
0426 
0427     task_io_account_read(count);
0428 
0429     result = -ENOMEM;
0430     dreq = nfs_direct_req_alloc();
0431     if (dreq == NULL)
0432         goto out;
0433 
0434     dreq->inode = inode;
0435     dreq->bytes_left = dreq->max_count = count;
0436     dreq->io_start = iocb->ki_pos;
0437     dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
0438     l_ctx = nfs_get_lock_context(dreq->ctx);
0439     if (IS_ERR(l_ctx)) {
0440         result = PTR_ERR(l_ctx);
0441         nfs_direct_req_release(dreq);
0442         goto out_release;
0443     }
0444     dreq->l_ctx = l_ctx;
0445     if (!is_sync_kiocb(iocb))
0446         dreq->iocb = iocb;
0447 
0448     if (user_backed_iter(iter))
0449         dreq->flags = NFS_ODIRECT_SHOULD_DIRTY;
0450 
0451     if (!swap)
0452         nfs_start_io_direct(inode);
0453 
0454     NFS_I(inode)->read_io += count;
0455     requested = nfs_direct_read_schedule_iovec(dreq, iter, iocb->ki_pos);
0456 
0457     if (!swap)
0458         nfs_end_io_direct(inode);
0459 
0460     if (requested > 0) {
0461         result = nfs_direct_wait(dreq);
0462         if (result > 0) {
0463             requested -= result;
0464             iocb->ki_pos += result;
0465         }
0466         iov_iter_revert(iter, requested);
0467     } else {
0468         result = requested;
0469     }
0470 
0471 out_release:
0472     nfs_direct_req_release(dreq);
0473 out:
0474     return result;
0475 }
0476 
0477 static void
0478 nfs_direct_join_group(struct list_head *list, struct inode *inode)
0479 {
0480     struct nfs_page *req, *next;
0481 
0482     list_for_each_entry(req, list, wb_list) {
0483         if (req->wb_head != req || req->wb_this_page == req)
0484             continue;
0485         for (next = req->wb_this_page;
0486                 next != req->wb_head;
0487                 next = next->wb_this_page) {
0488             nfs_list_remove_request(next);
0489             nfs_release_request(next);
0490         }
0491         nfs_join_page_group(req, inode);
0492     }
0493 }
0494 
0495 static void
0496 nfs_direct_write_scan_commit_list(struct inode *inode,
0497                   struct list_head *list,
0498                   struct nfs_commit_info *cinfo)
0499 {
0500     mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
0501     pnfs_recover_commit_reqs(list, cinfo);
0502     nfs_scan_commit_list(&cinfo->mds->list, list, cinfo, 0);
0503     mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
0504 }
0505 
0506 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
0507 {
0508     struct nfs_pageio_descriptor desc;
0509     struct nfs_page *req, *tmp;
0510     LIST_HEAD(reqs);
0511     struct nfs_commit_info cinfo;
0512     LIST_HEAD(failed);
0513 
0514     nfs_init_cinfo_from_dreq(&cinfo, dreq);
0515     nfs_direct_write_scan_commit_list(dreq->inode, &reqs, &cinfo);
0516 
0517     nfs_direct_join_group(&reqs, dreq->inode);
0518 
0519     dreq->count = 0;
0520     dreq->max_count = 0;
0521     list_for_each_entry(req, &reqs, wb_list)
0522         dreq->max_count += req->wb_bytes;
0523     nfs_clear_pnfs_ds_commit_verifiers(&dreq->ds_cinfo);
0524     get_dreq(dreq);
0525 
0526     nfs_pageio_init_write(&desc, dreq->inode, FLUSH_STABLE, false,
0527                   &nfs_direct_write_completion_ops);
0528     desc.pg_dreq = dreq;
0529 
0530     list_for_each_entry_safe(req, tmp, &reqs, wb_list) {
0531         /* Bump the transmission count */
0532         req->wb_nio++;
0533         if (!nfs_pageio_add_request(&desc, req)) {
0534             nfs_list_move_request(req, &failed);
0535             spin_lock(&cinfo.inode->i_lock);
0536             dreq->flags = 0;
0537             if (desc.pg_error < 0)
0538                 dreq->error = desc.pg_error;
0539             else
0540                 dreq->error = -EIO;
0541             spin_unlock(&cinfo.inode->i_lock);
0542         }
0543         nfs_release_request(req);
0544     }
0545     nfs_pageio_complete(&desc);
0546 
0547     while (!list_empty(&failed)) {
0548         req = nfs_list_entry(failed.next);
0549         nfs_list_remove_request(req);
0550         nfs_unlock_and_release_request(req);
0551     }
0552 
0553     if (put_dreq(dreq))
0554         nfs_direct_write_complete(dreq);
0555 }
0556 
0557 static void nfs_direct_commit_complete(struct nfs_commit_data *data)
0558 {
0559     const struct nfs_writeverf *verf = data->res.verf;
0560     struct nfs_direct_req *dreq = data->dreq;
0561     struct nfs_commit_info cinfo;
0562     struct nfs_page *req;
0563     int status = data->task.tk_status;
0564 
0565     trace_nfs_direct_commit_complete(dreq);
0566 
0567     if (status < 0) {
0568         /* Errors in commit are fatal */
0569         dreq->error = status;
0570         dreq->max_count = 0;
0571         dreq->count = 0;
0572         dreq->flags = NFS_ODIRECT_DONE;
0573     } else {
0574         status = dreq->error;
0575     }
0576 
0577     nfs_init_cinfo_from_dreq(&cinfo, dreq);
0578 
0579     while (!list_empty(&data->pages)) {
0580         req = nfs_list_entry(data->pages.next);
0581         nfs_list_remove_request(req);
0582         if (status >= 0 && !nfs_write_match_verf(verf, req)) {
0583             dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
0584             /*
0585              * Despite the reboot, the write was successful,
0586              * so reset wb_nio.
0587              */
0588             req->wb_nio = 0;
0589             nfs_mark_request_commit(req, NULL, &cinfo, 0);
0590         } else /* Error or match */
0591             nfs_release_request(req);
0592         nfs_unlock_and_release_request(req);
0593     }
0594 
0595     if (nfs_commit_end(cinfo.mds))
0596         nfs_direct_write_complete(dreq);
0597 }
0598 
0599 static void nfs_direct_resched_write(struct nfs_commit_info *cinfo,
0600         struct nfs_page *req)
0601 {
0602     struct nfs_direct_req *dreq = cinfo->dreq;
0603 
0604     trace_nfs_direct_resched_write(dreq);
0605 
0606     spin_lock(&dreq->lock);
0607     if (dreq->flags != NFS_ODIRECT_DONE)
0608         dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
0609     spin_unlock(&dreq->lock);
0610     nfs_mark_request_commit(req, NULL, cinfo, 0);
0611 }
0612 
0613 static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops = {
0614     .completion = nfs_direct_commit_complete,
0615     .resched_write = nfs_direct_resched_write,
0616 };
0617 
0618 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
0619 {
0620     int res;
0621     struct nfs_commit_info cinfo;
0622     LIST_HEAD(mds_list);
0623 
0624     nfs_init_cinfo_from_dreq(&cinfo, dreq);
0625     nfs_scan_commit(dreq->inode, &mds_list, &cinfo);
0626     res = nfs_generic_commit_list(dreq->inode, &mds_list, 0, &cinfo);
0627     if (res < 0) /* res == -ENOMEM */
0628         nfs_direct_write_reschedule(dreq);
0629 }
0630 
0631 static void nfs_direct_write_clear_reqs(struct nfs_direct_req *dreq)
0632 {
0633     struct nfs_commit_info cinfo;
0634     struct nfs_page *req;
0635     LIST_HEAD(reqs);
0636 
0637     nfs_init_cinfo_from_dreq(&cinfo, dreq);
0638     nfs_direct_write_scan_commit_list(dreq->inode, &reqs, &cinfo);
0639 
0640     while (!list_empty(&reqs)) {
0641         req = nfs_list_entry(reqs.next);
0642         nfs_list_remove_request(req);
0643         nfs_release_request(req);
0644         nfs_unlock_and_release_request(req);
0645     }
0646 }
0647 
0648 static void nfs_direct_write_schedule_work(struct work_struct *work)
0649 {
0650     struct nfs_direct_req *dreq = container_of(work, struct nfs_direct_req, work);
0651     int flags = dreq->flags;
0652 
0653     dreq->flags = 0;
0654     switch (flags) {
0655         case NFS_ODIRECT_DO_COMMIT:
0656             nfs_direct_commit_schedule(dreq);
0657             break;
0658         case NFS_ODIRECT_RESCHED_WRITES:
0659             nfs_direct_write_reschedule(dreq);
0660             break;
0661         default:
0662             nfs_direct_write_clear_reqs(dreq);
0663             nfs_zap_mapping(dreq->inode, dreq->inode->i_mapping);
0664             nfs_direct_complete(dreq);
0665     }
0666 }
0667 
0668 static void nfs_direct_write_complete(struct nfs_direct_req *dreq)
0669 {
0670     trace_nfs_direct_write_complete(dreq);
0671     queue_work(nfsiod_workqueue, &dreq->work); /* Calls nfs_direct_write_schedule_work */
0672 }
0673 
0674 static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
0675 {
0676     struct nfs_direct_req *dreq = hdr->dreq;
0677     struct nfs_commit_info cinfo;
0678     struct nfs_page *req = nfs_list_entry(hdr->pages.next);
0679     int flags = NFS_ODIRECT_DONE;
0680 
0681     trace_nfs_direct_write_completion(dreq);
0682 
0683     nfs_init_cinfo_from_dreq(&cinfo, dreq);
0684 
0685     spin_lock(&dreq->lock);
0686     if (test_bit(NFS_IOHDR_REDO, &hdr->flags)) {
0687         spin_unlock(&dreq->lock);
0688         goto out_put;
0689     }
0690 
0691     nfs_direct_count_bytes(dreq, hdr);
0692     if (test_bit(NFS_IOHDR_UNSTABLE_WRITES, &hdr->flags)) {
0693         if (!dreq->flags)
0694             dreq->flags = NFS_ODIRECT_DO_COMMIT;
0695         flags = dreq->flags;
0696     }
0697     spin_unlock(&dreq->lock);
0698 
0699     while (!list_empty(&hdr->pages)) {
0700 
0701         req = nfs_list_entry(hdr->pages.next);
0702         nfs_list_remove_request(req);
0703         if (flags == NFS_ODIRECT_DO_COMMIT) {
0704             kref_get(&req->wb_kref);
0705             memcpy(&req->wb_verf, &hdr->verf.verifier,
0706                    sizeof(req->wb_verf));
0707             nfs_mark_request_commit(req, hdr->lseg, &cinfo,
0708                 hdr->ds_commit_idx);
0709         } else if (flags == NFS_ODIRECT_RESCHED_WRITES) {
0710             kref_get(&req->wb_kref);
0711             nfs_mark_request_commit(req, NULL, &cinfo, 0);
0712         }
0713         nfs_unlock_and_release_request(req);
0714     }
0715 
0716 out_put:
0717     if (put_dreq(dreq))
0718         nfs_direct_write_complete(dreq);
0719     hdr->release(hdr);
0720 }
0721 
0722 static void nfs_write_sync_pgio_error(struct list_head *head, int error)
0723 {
0724     struct nfs_page *req;
0725 
0726     while (!list_empty(head)) {
0727         req = nfs_list_entry(head->next);
0728         nfs_list_remove_request(req);
0729         nfs_unlock_and_release_request(req);
0730     }
0731 }
0732 
0733 static void nfs_direct_write_reschedule_io(struct nfs_pgio_header *hdr)
0734 {
0735     struct nfs_direct_req *dreq = hdr->dreq;
0736 
0737     trace_nfs_direct_write_reschedule_io(dreq);
0738 
0739     spin_lock(&dreq->lock);
0740     if (dreq->error == 0) {
0741         dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
0742         /* fake unstable write to let common nfs resend pages */
0743         hdr->verf.committed = NFS_UNSTABLE;
0744         hdr->good_bytes = hdr->args.offset + hdr->args.count -
0745             hdr->io_start;
0746     }
0747     spin_unlock(&dreq->lock);
0748 }
0749 
0750 static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops = {
0751     .error_cleanup = nfs_write_sync_pgio_error,
0752     .init_hdr = nfs_direct_pgio_init,
0753     .completion = nfs_direct_write_completion,
0754     .reschedule_io = nfs_direct_write_reschedule_io,
0755 };
0756 
0757 
0758 /*
0759  * NB: Return the value of the first error return code.  Subsequent
0760  *     errors after the first one are ignored.
0761  */
0762 /*
0763  * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE
0764  * operation.  If nfs_writedata_alloc() or get_user_pages() fails,
0765  * bail and stop sending more writes.  Write length accounting is
0766  * handled automatically by nfs_direct_write_result().  Otherwise, if
0767  * no requests have been sent, just return an error.
0768  */
0769 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
0770                            struct iov_iter *iter,
0771                            loff_t pos, int ioflags)
0772 {
0773     struct nfs_pageio_descriptor desc;
0774     struct inode *inode = dreq->inode;
0775     ssize_t result = 0;
0776     size_t requested_bytes = 0;
0777     size_t wsize = max_t(size_t, NFS_SERVER(inode)->wsize, PAGE_SIZE);
0778 
0779     trace_nfs_direct_write_schedule_iovec(dreq);
0780 
0781     nfs_pageio_init_write(&desc, inode, ioflags, false,
0782                   &nfs_direct_write_completion_ops);
0783     desc.pg_dreq = dreq;
0784     get_dreq(dreq);
0785     inode_dio_begin(inode);
0786 
0787     NFS_I(inode)->write_io += iov_iter_count(iter);
0788     while (iov_iter_count(iter)) {
0789         struct page **pagevec;
0790         size_t bytes;
0791         size_t pgbase;
0792         unsigned npages, i;
0793 
0794         result = iov_iter_get_pages_alloc2(iter, &pagevec,
0795                           wsize, &pgbase);
0796         if (result < 0)
0797             break;
0798 
0799         bytes = result;
0800         npages = (result + pgbase + PAGE_SIZE - 1) / PAGE_SIZE;
0801         for (i = 0; i < npages; i++) {
0802             struct nfs_page *req;
0803             unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
0804 
0805             req = nfs_create_request(dreq->ctx, pagevec[i],
0806                          pgbase, req_len);
0807             if (IS_ERR(req)) {
0808                 result = PTR_ERR(req);
0809                 break;
0810             }
0811 
0812             if (desc.pg_error < 0) {
0813                 nfs_free_request(req);
0814                 result = desc.pg_error;
0815                 break;
0816             }
0817 
0818             nfs_lock_request(req);
0819             req->wb_index = pos >> PAGE_SHIFT;
0820             req->wb_offset = pos & ~PAGE_MASK;
0821             if (!nfs_pageio_add_request(&desc, req)) {
0822                 result = desc.pg_error;
0823                 nfs_unlock_and_release_request(req);
0824                 break;
0825             }
0826             pgbase = 0;
0827             bytes -= req_len;
0828             requested_bytes += req_len;
0829             pos += req_len;
0830             dreq->bytes_left -= req_len;
0831         }
0832         nfs_direct_release_pages(pagevec, npages);
0833         kvfree(pagevec);
0834         if (result < 0)
0835             break;
0836     }
0837     nfs_pageio_complete(&desc);
0838 
0839     /*
0840      * If no bytes were started, return the error, and let the
0841      * generic layer handle the completion.
0842      */
0843     if (requested_bytes == 0) {
0844         inode_dio_end(inode);
0845         nfs_direct_req_release(dreq);
0846         return result < 0 ? result : -EIO;
0847     }
0848 
0849     if (put_dreq(dreq))
0850         nfs_direct_write_complete(dreq);
0851     return requested_bytes;
0852 }
0853 
0854 /**
0855  * nfs_file_direct_write - file direct write operation for NFS files
0856  * @iocb: target I/O control block
0857  * @iter: vector of user buffers from which to write data
0858  * @swap: flag indicating this is swap IO, not O_DIRECT IO
0859  *
0860  * We use this function for direct writes instead of calling
0861  * generic_file_aio_write() in order to avoid taking the inode
0862  * semaphore and updating the i_size.  The NFS server will set
0863  * the new i_size and this client must read the updated size
0864  * back into its cache.  We let the server do generic write
0865  * parameter checking and report problems.
0866  *
0867  * We eliminate local atime updates, see direct read above.
0868  *
0869  * We avoid unnecessary page cache invalidations for normal cached
0870  * readers of this file.
0871  *
0872  * Note that O_APPEND is not supported for NFS direct writes, as there
0873  * is no atomic O_APPEND write facility in the NFS protocol.
0874  */
0875 ssize_t nfs_file_direct_write(struct kiocb *iocb, struct iov_iter *iter,
0876                   bool swap)
0877 {
0878     ssize_t result, requested;
0879     size_t count;
0880     struct file *file = iocb->ki_filp;
0881     struct address_space *mapping = file->f_mapping;
0882     struct inode *inode = mapping->host;
0883     struct nfs_direct_req *dreq;
0884     struct nfs_lock_context *l_ctx;
0885     loff_t pos, end;
0886 
0887     dfprintk(FILE, "NFS: direct write(%pD2, %zd@%Ld)\n",
0888         file, iov_iter_count(iter), (long long) iocb->ki_pos);
0889 
0890     if (swap)
0891         /* bypass generic checks */
0892         result =  iov_iter_count(iter);
0893     else
0894         result = generic_write_checks(iocb, iter);
0895     if (result <= 0)
0896         return result;
0897     count = result;
0898     nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count);
0899 
0900     pos = iocb->ki_pos;
0901     end = (pos + iov_iter_count(iter) - 1) >> PAGE_SHIFT;
0902 
0903     task_io_account_write(count);
0904 
0905     result = -ENOMEM;
0906     dreq = nfs_direct_req_alloc();
0907     if (!dreq)
0908         goto out;
0909 
0910     dreq->inode = inode;
0911     dreq->bytes_left = dreq->max_count = count;
0912     dreq->io_start = pos;
0913     dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
0914     l_ctx = nfs_get_lock_context(dreq->ctx);
0915     if (IS_ERR(l_ctx)) {
0916         result = PTR_ERR(l_ctx);
0917         nfs_direct_req_release(dreq);
0918         goto out_release;
0919     }
0920     dreq->l_ctx = l_ctx;
0921     if (!is_sync_kiocb(iocb))
0922         dreq->iocb = iocb;
0923     pnfs_init_ds_commit_info_ops(&dreq->ds_cinfo, inode);
0924 
0925     if (swap) {
0926         requested = nfs_direct_write_schedule_iovec(dreq, iter, pos,
0927                                 FLUSH_STABLE);
0928     } else {
0929         nfs_start_io_direct(inode);
0930 
0931         requested = nfs_direct_write_schedule_iovec(dreq, iter, pos,
0932                                 FLUSH_COND_STABLE);
0933 
0934         if (mapping->nrpages) {
0935             invalidate_inode_pages2_range(mapping,
0936                               pos >> PAGE_SHIFT, end);
0937         }
0938 
0939         nfs_end_io_direct(inode);
0940     }
0941 
0942     if (requested > 0) {
0943         result = nfs_direct_wait(dreq);
0944         if (result > 0) {
0945             requested -= result;
0946             iocb->ki_pos = pos + result;
0947             /* XXX: should check the generic_write_sync retval */
0948             generic_write_sync(iocb, result);
0949         }
0950         iov_iter_revert(iter, requested);
0951     } else {
0952         result = requested;
0953     }
0954     nfs_fscache_invalidate(inode, FSCACHE_INVAL_DIO_WRITE);
0955 out_release:
0956     nfs_direct_req_release(dreq);
0957 out:
0958     return result;
0959 }
0960 
0961 /**
0962  * nfs_init_directcache - create a slab cache for nfs_direct_req structures
0963  *
0964  */
0965 int __init nfs_init_directcache(void)
0966 {
0967     nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
0968                         sizeof(struct nfs_direct_req),
0969                         0, (SLAB_RECLAIM_ACCOUNT|
0970                             SLAB_MEM_SPREAD),
0971                         NULL);
0972     if (nfs_direct_cachep == NULL)
0973         return -ENOMEM;
0974 
0975     return 0;
0976 }
0977 
0978 /**
0979  * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures
0980  *
0981  */
0982 void nfs_destroy_directcache(void)
0983 {
0984     kmem_cache_destroy(nfs_direct_cachep);
0985 }