Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0
0002 /*
0003  * Device operations for the pnfs nfs4 file layout driver.
0004  *
0005  * Copyright (c) 2014, Primary Data, Inc. All rights reserved.
0006  *
0007  * Tao Peng <bergwolf@primarydata.com>
0008  */
0009 
0010 #include <linux/nfs_fs.h>
0011 #include <linux/vmalloc.h>
0012 #include <linux/module.h>
0013 #include <linux/sunrpc/addr.h>
0014 
0015 #include "../internal.h"
0016 #include "../nfs4session.h"
0017 #include "flexfilelayout.h"
0018 
0019 #define NFSDBG_FACILITY     NFSDBG_PNFS_LD
0020 
0021 static unsigned int dataserver_timeo = NFS_DEF_TCP_TIMEO;
0022 static unsigned int dataserver_retrans;
0023 
0024 static bool ff_layout_has_available_ds(struct pnfs_layout_segment *lseg);
0025 
0026 void nfs4_ff_layout_put_deviceid(struct nfs4_ff_layout_ds *mirror_ds)
0027 {
0028     if (!IS_ERR_OR_NULL(mirror_ds))
0029         nfs4_put_deviceid_node(&mirror_ds->id_node);
0030 }
0031 
0032 void nfs4_ff_layout_free_deviceid(struct nfs4_ff_layout_ds *mirror_ds)
0033 {
0034     nfs4_print_deviceid(&mirror_ds->id_node.deviceid);
0035     nfs4_pnfs_ds_put(mirror_ds->ds);
0036     kfree(mirror_ds->ds_versions);
0037     kfree_rcu(mirror_ds, id_node.rcu);
0038 }
0039 
0040 /* Decode opaque device data and construct new_ds using it */
0041 struct nfs4_ff_layout_ds *
0042 nfs4_ff_alloc_deviceid_node(struct nfs_server *server, struct pnfs_device *pdev,
0043                 gfp_t gfp_flags)
0044 {
0045     struct xdr_stream stream;
0046     struct xdr_buf buf;
0047     struct page *scratch;
0048     struct list_head dsaddrs;
0049     struct nfs4_pnfs_ds_addr *da;
0050     struct nfs4_ff_layout_ds *new_ds = NULL;
0051     struct nfs4_ff_ds_version *ds_versions = NULL;
0052     u32 mp_count;
0053     u32 version_count;
0054     __be32 *p;
0055     int i, ret = -ENOMEM;
0056 
0057     /* set up xdr stream */
0058     scratch = alloc_page(gfp_flags);
0059     if (!scratch)
0060         goto out_err;
0061 
0062     new_ds = kzalloc(sizeof(struct nfs4_ff_layout_ds), gfp_flags);
0063     if (!new_ds)
0064         goto out_scratch;
0065 
0066     nfs4_init_deviceid_node(&new_ds->id_node,
0067                 server,
0068                 &pdev->dev_id);
0069     INIT_LIST_HEAD(&dsaddrs);
0070 
0071     xdr_init_decode_pages(&stream, &buf, pdev->pages, pdev->pglen);
0072     xdr_set_scratch_page(&stream, scratch);
0073 
0074     /* multipath count */
0075     p = xdr_inline_decode(&stream, 4);
0076     if (unlikely(!p))
0077         goto out_err_drain_dsaddrs;
0078     mp_count = be32_to_cpup(p);
0079     dprintk("%s: multipath ds count %d\n", __func__, mp_count);
0080 
0081     for (i = 0; i < mp_count; i++) {
0082         /* multipath ds */
0083         da = nfs4_decode_mp_ds_addr(server->nfs_client->cl_net,
0084                         &stream, gfp_flags);
0085         if (da)
0086             list_add_tail(&da->da_node, &dsaddrs);
0087     }
0088     if (list_empty(&dsaddrs)) {
0089         dprintk("%s: no suitable DS addresses found\n",
0090             __func__);
0091         ret = -ENOMEDIUM;
0092         goto out_err_drain_dsaddrs;
0093     }
0094 
0095     /* version count */
0096     p = xdr_inline_decode(&stream, 4);
0097     if (unlikely(!p))
0098         goto out_err_drain_dsaddrs;
0099     version_count = be32_to_cpup(p);
0100     dprintk("%s: version count %d\n", __func__, version_count);
0101 
0102     ds_versions = kcalloc(version_count,
0103                   sizeof(struct nfs4_ff_ds_version),
0104                   gfp_flags);
0105     if (!ds_versions)
0106         goto out_scratch;
0107 
0108     for (i = 0; i < version_count; i++) {
0109         /* 20 = version(4) + minor_version(4) + rsize(4) + wsize(4) +
0110          * tightly_coupled(4) */
0111         p = xdr_inline_decode(&stream, 20);
0112         if (unlikely(!p))
0113             goto out_err_drain_dsaddrs;
0114         ds_versions[i].version = be32_to_cpup(p++);
0115         ds_versions[i].minor_version = be32_to_cpup(p++);
0116         ds_versions[i].rsize = nfs_io_size(be32_to_cpup(p++),
0117                            server->nfs_client->cl_proto);
0118         ds_versions[i].wsize = nfs_io_size(be32_to_cpup(p++),
0119                            server->nfs_client->cl_proto);
0120         ds_versions[i].tightly_coupled = be32_to_cpup(p);
0121 
0122         if (ds_versions[i].rsize > NFS_MAX_FILE_IO_SIZE)
0123             ds_versions[i].rsize = NFS_MAX_FILE_IO_SIZE;
0124         if (ds_versions[i].wsize > NFS_MAX_FILE_IO_SIZE)
0125             ds_versions[i].wsize = NFS_MAX_FILE_IO_SIZE;
0126 
0127         /*
0128          * check for valid major/minor combination.
0129          * currently we support dataserver which talk:
0130          *   v3, v4.0, v4.1, v4.2
0131          */
0132         if (!((ds_versions[i].version == 3 && ds_versions[i].minor_version == 0) ||
0133             (ds_versions[i].version == 4 && ds_versions[i].minor_version < 3))) {
0134             dprintk("%s: [%d] unsupported ds version %d-%d\n", __func__,
0135                 i, ds_versions[i].version,
0136                 ds_versions[i].minor_version);
0137             ret = -EPROTONOSUPPORT;
0138             goto out_err_drain_dsaddrs;
0139         }
0140 
0141         dprintk("%s: [%d] vers %u minor_ver %u rsize %u wsize %u coupled %d\n",
0142             __func__, i, ds_versions[i].version,
0143             ds_versions[i].minor_version,
0144             ds_versions[i].rsize,
0145             ds_versions[i].wsize,
0146             ds_versions[i].tightly_coupled);
0147     }
0148 
0149     new_ds->ds_versions = ds_versions;
0150     new_ds->ds_versions_cnt = version_count;
0151 
0152     new_ds->ds = nfs4_pnfs_ds_add(&dsaddrs, gfp_flags);
0153     if (!new_ds->ds)
0154         goto out_err_drain_dsaddrs;
0155 
0156     /* If DS was already in cache, free ds addrs */
0157     while (!list_empty(&dsaddrs)) {
0158         da = list_first_entry(&dsaddrs,
0159                       struct nfs4_pnfs_ds_addr,
0160                       da_node);
0161         list_del_init(&da->da_node);
0162         kfree(da->da_remotestr);
0163         kfree(da);
0164     }
0165 
0166     __free_page(scratch);
0167     return new_ds;
0168 
0169 out_err_drain_dsaddrs:
0170     while (!list_empty(&dsaddrs)) {
0171         da = list_first_entry(&dsaddrs, struct nfs4_pnfs_ds_addr,
0172                       da_node);
0173         list_del_init(&da->da_node);
0174         kfree(da->da_remotestr);
0175         kfree(da);
0176     }
0177 
0178     kfree(ds_versions);
0179 out_scratch:
0180     __free_page(scratch);
0181 out_err:
0182     kfree(new_ds);
0183 
0184     dprintk("%s ERROR: returning %d\n", __func__, ret);
0185     return NULL;
0186 }
0187 
0188 static void extend_ds_error(struct nfs4_ff_layout_ds_err *err,
0189                 u64 offset, u64 length)
0190 {
0191     u64 end;
0192 
0193     end = max_t(u64, pnfs_end_offset(err->offset, err->length),
0194             pnfs_end_offset(offset, length));
0195     err->offset = min_t(u64, err->offset, offset);
0196     err->length = end - err->offset;
0197 }
0198 
0199 static int
0200 ff_ds_error_match(const struct nfs4_ff_layout_ds_err *e1,
0201         const struct nfs4_ff_layout_ds_err *e2)
0202 {
0203     int ret;
0204 
0205     if (e1->opnum != e2->opnum)
0206         return e1->opnum < e2->opnum ? -1 : 1;
0207     if (e1->status != e2->status)
0208         return e1->status < e2->status ? -1 : 1;
0209     ret = memcmp(e1->stateid.data, e2->stateid.data,
0210             sizeof(e1->stateid.data));
0211     if (ret != 0)
0212         return ret;
0213     ret = memcmp(&e1->deviceid, &e2->deviceid, sizeof(e1->deviceid));
0214     if (ret != 0)
0215         return ret;
0216     if (pnfs_end_offset(e1->offset, e1->length) < e2->offset)
0217         return -1;
0218     if (e1->offset > pnfs_end_offset(e2->offset, e2->length))
0219         return 1;
0220     /* If ranges overlap or are contiguous, they are the same */
0221     return 0;
0222 }
0223 
0224 static void
0225 ff_layout_add_ds_error_locked(struct nfs4_flexfile_layout *flo,
0226                   struct nfs4_ff_layout_ds_err *dserr)
0227 {
0228     struct nfs4_ff_layout_ds_err *err, *tmp;
0229     struct list_head *head = &flo->error_list;
0230     int match;
0231 
0232     /* Do insertion sort w/ merges */
0233     list_for_each_entry_safe(err, tmp, &flo->error_list, list) {
0234         match = ff_ds_error_match(err, dserr);
0235         if (match < 0)
0236             continue;
0237         if (match > 0) {
0238             /* Add entry "dserr" _before_ entry "err" */
0239             head = &err->list;
0240             break;
0241         }
0242         /* Entries match, so merge "err" into "dserr" */
0243         extend_ds_error(dserr, err->offset, err->length);
0244         list_replace(&err->list, &dserr->list);
0245         kfree(err);
0246         return;
0247     }
0248 
0249     list_add_tail(&dserr->list, head);
0250 }
0251 
0252 int ff_layout_track_ds_error(struct nfs4_flexfile_layout *flo,
0253                  struct nfs4_ff_layout_mirror *mirror, u64 offset,
0254                  u64 length, int status, enum nfs_opnum4 opnum,
0255                  gfp_t gfp_flags)
0256 {
0257     struct nfs4_ff_layout_ds_err *dserr;
0258 
0259     if (status == 0)
0260         return 0;
0261 
0262     if (IS_ERR_OR_NULL(mirror->mirror_ds))
0263         return -EINVAL;
0264 
0265     dserr = kmalloc(sizeof(*dserr), gfp_flags);
0266     if (!dserr)
0267         return -ENOMEM;
0268 
0269     INIT_LIST_HEAD(&dserr->list);
0270     dserr->offset = offset;
0271     dserr->length = length;
0272     dserr->status = status;
0273     dserr->opnum = opnum;
0274     nfs4_stateid_copy(&dserr->stateid, &mirror->stateid);
0275     memcpy(&dserr->deviceid, &mirror->mirror_ds->id_node.deviceid,
0276            NFS4_DEVICEID4_SIZE);
0277 
0278     spin_lock(&flo->generic_hdr.plh_inode->i_lock);
0279     ff_layout_add_ds_error_locked(flo, dserr);
0280     spin_unlock(&flo->generic_hdr.plh_inode->i_lock);
0281     return 0;
0282 }
0283 
0284 static const struct cred *
0285 ff_layout_get_mirror_cred(struct nfs4_ff_layout_mirror *mirror, u32 iomode)
0286 {
0287     const struct cred *cred, __rcu **pcred;
0288 
0289     if (iomode == IOMODE_READ)
0290         pcred = &mirror->ro_cred;
0291     else
0292         pcred = &mirror->rw_cred;
0293 
0294     rcu_read_lock();
0295     do {
0296         cred = rcu_dereference(*pcred);
0297         if (!cred)
0298             break;
0299 
0300         cred = get_cred_rcu(cred);
0301     } while(!cred);
0302     rcu_read_unlock();
0303     return cred;
0304 }
0305 
0306 struct nfs_fh *
0307 nfs4_ff_layout_select_ds_fh(struct nfs4_ff_layout_mirror *mirror)
0308 {
0309     /* FIXME: For now assume there is only 1 version available for the DS */
0310     return &mirror->fh_versions[0];
0311 }
0312 
0313 void
0314 nfs4_ff_layout_select_ds_stateid(const struct nfs4_ff_layout_mirror *mirror,
0315         nfs4_stateid *stateid)
0316 {
0317     if (nfs4_ff_layout_ds_version(mirror) == 4)
0318         nfs4_stateid_copy(stateid, &mirror->stateid);
0319 }
0320 
0321 static bool
0322 ff_layout_init_mirror_ds(struct pnfs_layout_hdr *lo,
0323              struct nfs4_ff_layout_mirror *mirror)
0324 {
0325     if (mirror == NULL)
0326         goto outerr;
0327     if (mirror->mirror_ds == NULL) {
0328         struct nfs4_deviceid_node *node;
0329         struct nfs4_ff_layout_ds *mirror_ds = ERR_PTR(-ENODEV);
0330 
0331         node = nfs4_find_get_deviceid(NFS_SERVER(lo->plh_inode),
0332                 &mirror->devid, lo->plh_lc_cred,
0333                 GFP_KERNEL);
0334         if (node)
0335             mirror_ds = FF_LAYOUT_MIRROR_DS(node);
0336 
0337         /* check for race with another call to this function */
0338         if (cmpxchg(&mirror->mirror_ds, NULL, mirror_ds) &&
0339             mirror_ds != ERR_PTR(-ENODEV))
0340             nfs4_put_deviceid_node(node);
0341     }
0342 
0343     if (IS_ERR(mirror->mirror_ds))
0344         goto outerr;
0345 
0346     return true;
0347 outerr:
0348     return false;
0349 }
0350 
0351 /**
0352  * nfs4_ff_layout_prepare_ds - prepare a DS connection for an RPC call
0353  * @lseg: the layout segment we're operating on
0354  * @mirror: layout mirror describing the DS to use
0355  * @fail_return: return layout on connect failure?
0356  *
0357  * Try to prepare a DS connection to accept an RPC call. This involves
0358  * selecting a mirror to use and connecting the client to it if it's not
0359  * already connected.
0360  *
0361  * Since we only need a single functioning mirror to satisfy a read, we don't
0362  * want to return the layout if there is one. For writes though, any down
0363  * mirror should result in a LAYOUTRETURN. @fail_return is how we distinguish
0364  * between the two cases.
0365  *
0366  * Returns a pointer to a connected DS object on success or NULL on failure.
0367  */
0368 struct nfs4_pnfs_ds *
0369 nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg,
0370               struct nfs4_ff_layout_mirror *mirror,
0371               bool fail_return)
0372 {
0373     struct nfs4_pnfs_ds *ds = NULL;
0374     struct inode *ino = lseg->pls_layout->plh_inode;
0375     struct nfs_server *s = NFS_SERVER(ino);
0376     unsigned int max_payload;
0377     int status;
0378 
0379     if (!ff_layout_init_mirror_ds(lseg->pls_layout, mirror))
0380         goto noconnect;
0381 
0382     ds = mirror->mirror_ds->ds;
0383     if (READ_ONCE(ds->ds_clp))
0384         goto out;
0385     /* matching smp_wmb() in _nfs4_pnfs_v3/4_ds_connect */
0386     smp_rmb();
0387 
0388     /* FIXME: For now we assume the server sent only one version of NFS
0389      * to use for the DS.
0390      */
0391     status = nfs4_pnfs_ds_connect(s, ds, &mirror->mirror_ds->id_node,
0392                  dataserver_timeo, dataserver_retrans,
0393                  mirror->mirror_ds->ds_versions[0].version,
0394                  mirror->mirror_ds->ds_versions[0].minor_version);
0395 
0396     /* connect success, check rsize/wsize limit */
0397     if (!status) {
0398         max_payload =
0399             nfs_block_size(rpc_max_payload(ds->ds_clp->cl_rpcclient),
0400                        NULL);
0401         if (mirror->mirror_ds->ds_versions[0].rsize > max_payload)
0402             mirror->mirror_ds->ds_versions[0].rsize = max_payload;
0403         if (mirror->mirror_ds->ds_versions[0].wsize > max_payload)
0404             mirror->mirror_ds->ds_versions[0].wsize = max_payload;
0405         goto out;
0406     }
0407 noconnect:
0408     ff_layout_track_ds_error(FF_LAYOUT_FROM_HDR(lseg->pls_layout),
0409                  mirror, lseg->pls_range.offset,
0410                  lseg->pls_range.length, NFS4ERR_NXIO,
0411                  OP_ILLEGAL, GFP_NOIO);
0412     ff_layout_send_layouterror(lseg);
0413     if (fail_return || !ff_layout_has_available_ds(lseg))
0414         pnfs_error_mark_layout_for_return(ino, lseg);
0415     ds = NULL;
0416 out:
0417     return ds;
0418 }
0419 
0420 const struct cred *
0421 ff_layout_get_ds_cred(struct nfs4_ff_layout_mirror *mirror,
0422               const struct pnfs_layout_range *range,
0423               const struct cred *mdscred)
0424 {
0425     const struct cred *cred;
0426 
0427     if (mirror && !mirror->mirror_ds->ds_versions[0].tightly_coupled) {
0428         cred = ff_layout_get_mirror_cred(mirror, range->iomode);
0429         if (!cred)
0430             cred = get_cred(mdscred);
0431     } else {
0432         cred = get_cred(mdscred);
0433     }
0434     return cred;
0435 }
0436 
0437 /**
0438  * nfs4_ff_find_or_create_ds_client - Find or create a DS rpc client
0439  * @mirror: pointer to the mirror
0440  * @ds_clp: nfs_client for the DS
0441  * @inode: pointer to inode
0442  *
0443  * Find or create a DS rpc client with th MDS server rpc client auth flavor
0444  * in the nfs_client cl_ds_clients list.
0445  */
0446 struct rpc_clnt *
0447 nfs4_ff_find_or_create_ds_client(struct nfs4_ff_layout_mirror *mirror,
0448                  struct nfs_client *ds_clp, struct inode *inode)
0449 {
0450     switch (mirror->mirror_ds->ds_versions[0].version) {
0451     case 3:
0452         /* For NFSv3 DS, flavor is set when creating DS connections */
0453         return ds_clp->cl_rpcclient;
0454     case 4:
0455         return nfs4_find_or_create_ds_client(ds_clp, inode);
0456     default:
0457         BUG();
0458     }
0459 }
0460 
0461 void ff_layout_free_ds_ioerr(struct list_head *head)
0462 {
0463     struct nfs4_ff_layout_ds_err *err;
0464 
0465     while (!list_empty(head)) {
0466         err = list_first_entry(head,
0467                 struct nfs4_ff_layout_ds_err,
0468                 list);
0469         list_del(&err->list);
0470         kfree(err);
0471     }
0472 }
0473 
0474 /* called with inode i_lock held */
0475 int ff_layout_encode_ds_ioerr(struct xdr_stream *xdr, const struct list_head *head)
0476 {
0477     struct nfs4_ff_layout_ds_err *err;
0478     __be32 *p;
0479 
0480     list_for_each_entry(err, head, list) {
0481         /* offset(8) + length(8) + stateid(NFS4_STATEID_SIZE)
0482          * + array length + deviceid(NFS4_DEVICEID4_SIZE)
0483          * + status(4) + opnum(4)
0484          */
0485         p = xdr_reserve_space(xdr,
0486                 28 + NFS4_STATEID_SIZE + NFS4_DEVICEID4_SIZE);
0487         if (unlikely(!p))
0488             return -ENOBUFS;
0489         p = xdr_encode_hyper(p, err->offset);
0490         p = xdr_encode_hyper(p, err->length);
0491         p = xdr_encode_opaque_fixed(p, &err->stateid,
0492                         NFS4_STATEID_SIZE);
0493         /* Encode 1 error */
0494         *p++ = cpu_to_be32(1);
0495         p = xdr_encode_opaque_fixed(p, &err->deviceid,
0496                         NFS4_DEVICEID4_SIZE);
0497         *p++ = cpu_to_be32(err->status);
0498         *p++ = cpu_to_be32(err->opnum);
0499         dprintk("%s: offset %llu length %llu status %d op %d\n",
0500             __func__, err->offset, err->length, err->status,
0501             err->opnum);
0502     }
0503 
0504     return 0;
0505 }
0506 
0507 static
0508 unsigned int do_layout_fetch_ds_ioerr(struct pnfs_layout_hdr *lo,
0509                       const struct pnfs_layout_range *range,
0510                       struct list_head *head,
0511                       unsigned int maxnum)
0512 {
0513     struct nfs4_flexfile_layout *flo = FF_LAYOUT_FROM_HDR(lo);
0514     struct inode *inode = lo->plh_inode;
0515     struct nfs4_ff_layout_ds_err *err, *n;
0516     unsigned int ret = 0;
0517 
0518     spin_lock(&inode->i_lock);
0519     list_for_each_entry_safe(err, n, &flo->error_list, list) {
0520         if (!pnfs_is_range_intersecting(err->offset,
0521                 pnfs_end_offset(err->offset, err->length),
0522                 range->offset,
0523                 pnfs_end_offset(range->offset, range->length)))
0524             continue;
0525         if (!maxnum)
0526             break;
0527         list_move(&err->list, head);
0528         maxnum--;
0529         ret++;
0530     }
0531     spin_unlock(&inode->i_lock);
0532     return ret;
0533 }
0534 
0535 unsigned int ff_layout_fetch_ds_ioerr(struct pnfs_layout_hdr *lo,
0536                       const struct pnfs_layout_range *range,
0537                       struct list_head *head,
0538                       unsigned int maxnum)
0539 {
0540     unsigned int ret;
0541 
0542     ret = do_layout_fetch_ds_ioerr(lo, range, head, maxnum);
0543     /* If we're over the max, discard all remaining entries */
0544     if (ret == maxnum) {
0545         LIST_HEAD(discard);
0546         do_layout_fetch_ds_ioerr(lo, range, &discard, -1);
0547         ff_layout_free_ds_ioerr(&discard);
0548     }
0549     return ret;
0550 }
0551 
0552 static bool ff_read_layout_has_available_ds(struct pnfs_layout_segment *lseg)
0553 {
0554     struct nfs4_ff_layout_mirror *mirror;
0555     struct nfs4_deviceid_node *devid;
0556     u32 idx;
0557 
0558     for (idx = 0; idx < FF_LAYOUT_MIRROR_COUNT(lseg); idx++) {
0559         mirror = FF_LAYOUT_COMP(lseg, idx);
0560         if (mirror) {
0561             if (!mirror->mirror_ds)
0562                 return true;
0563             if (IS_ERR(mirror->mirror_ds))
0564                 continue;
0565             devid = &mirror->mirror_ds->id_node;
0566             if (!nfs4_test_deviceid_unavailable(devid))
0567                 return true;
0568         }
0569     }
0570 
0571     return false;
0572 }
0573 
0574 static bool ff_rw_layout_has_available_ds(struct pnfs_layout_segment *lseg)
0575 {
0576     struct nfs4_ff_layout_mirror *mirror;
0577     struct nfs4_deviceid_node *devid;
0578     u32 idx;
0579 
0580     for (idx = 0; idx < FF_LAYOUT_MIRROR_COUNT(lseg); idx++) {
0581         mirror = FF_LAYOUT_COMP(lseg, idx);
0582         if (!mirror || IS_ERR(mirror->mirror_ds))
0583             return false;
0584         if (!mirror->mirror_ds)
0585             continue;
0586         devid = &mirror->mirror_ds->id_node;
0587         if (nfs4_test_deviceid_unavailable(devid))
0588             return false;
0589     }
0590 
0591     return FF_LAYOUT_MIRROR_COUNT(lseg) != 0;
0592 }
0593 
0594 static bool ff_layout_has_available_ds(struct pnfs_layout_segment *lseg)
0595 {
0596     if (lseg->pls_range.iomode == IOMODE_READ)
0597         return  ff_read_layout_has_available_ds(lseg);
0598     /* Note: RW layout needs all mirrors available */
0599     return ff_rw_layout_has_available_ds(lseg);
0600 }
0601 
0602 bool ff_layout_avoid_mds_available_ds(struct pnfs_layout_segment *lseg)
0603 {
0604     return ff_layout_no_fallback_to_mds(lseg) ||
0605            ff_layout_has_available_ds(lseg);
0606 }
0607 
0608 bool ff_layout_avoid_read_on_rw(struct pnfs_layout_segment *lseg)
0609 {
0610     return lseg->pls_range.iomode == IOMODE_RW &&
0611            ff_layout_no_read_on_rw(lseg);
0612 }
0613 
0614 module_param(dataserver_retrans, uint, 0644);
0615 MODULE_PARM_DESC(dataserver_retrans, "The  number of times the NFSv4.1 client "
0616             "retries a request before it attempts further "
0617             " recovery  action.");
0618 module_param(dataserver_timeo, uint, 0644);
0619 MODULE_PARM_DESC(dataserver_timeo, "The time (in tenths of a second) the "
0620             "NFSv4.1  client  waits for a response from a "
0621             " data server before it retries an NFS request.");